[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4935 Hi @Aitozi, sorry for the long delay in relaying back to this PR. I'm still not convinced that this is a sane solution. For example, what is a "good" setting for the `KEY_REGISTER_TIMES` property? Isn't 1 enough, since you mentioned that the missing metric is registered in Kafka after the first poll. Making this configurable seems unnecessary to me. I wonder if we can try the following two approaches: 1) Manually register the metrics that we know would be missing before the first poll, or 2) Poll once first outside the loop just to make sure that all Kafka metrics are existent, perform the metrics registration, and then start the regular fetch loop. What do you think? ---
[GitHub] flink issue #5017: [FLINK-8076] Upgrade KinesisProducer to 0.10.6 to set pro...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5017 Merging ... ---
[jira] [Commented] (FLINK-8076) Upgrade KinesisProducer to 0.10.6 to set properties approperiately
[ https://issues.apache.org/jira/browse/FLINK-8076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270326#comment-16270326 ] ASF GitHub Bot commented on FLINK-8076: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5017 Merging ... > Upgrade KinesisProducer to 0.10.6 to set properties approperiately > -- > > Key: FLINK-8076 > URL: https://issues.apache.org/jira/browse/FLINK-8076 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5075: [hotfix] [docs] Fix typos in State Backends doc
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5075 Thanks! Merging this .. ---
[jira] [Commented] (FLINK-8027) Generalize existing rest handlers to work with arbitrary RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270313#comment-16270313 ] ASF GitHub Bot commented on FLINK-8027: --- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4985#discussion_r153708910 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.CompletableFuture; + +/** + * Simple {@link TaskExecutorGateway} implementation for testing purposes. + */ +public class TestingTaskExecutorGateway implements TaskExecutorGateway { + + private final String address; + + private final String hostname; + + public TestingTaskExecutorGateway() { + this("foobar:1234", "foobar"); + } + + public TestingTaskExecutorGateway(String address, String hostname) { + this.address = Preconditions.checkNotNull(address); + this.hostname = Preconditions.checkNotNull(hostname); + } + + @Override + public CompletableFuture requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture updatePartitions(ExecutionAttemptID executionAttemptID, Iterable partitionInfos, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void failPartition(ExecutionAttemptID executionAttemptID) { + // noop + } + + @Override + public CompletableFuture triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void heartbeatFromJobManager(ResourceID heartbeatOrigin) { + // noop + } + + @Override + public void
[GitHub] flink pull request #4985: [FLINK-8027] Generalize existing rest handlers to ...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4985#discussion_r153708910 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.CompletableFuture; + +/** + * Simple {@link TaskExecutorGateway} implementation for testing purposes. + */ +public class TestingTaskExecutorGateway implements TaskExecutorGateway { + + private final String address; + + private final String hostname; + + public TestingTaskExecutorGateway() { + this("foobar:1234", "foobar"); + } + + public TestingTaskExecutorGateway(String address, String hostname) { + this.address = Preconditions.checkNotNull(address); + this.hostname = Preconditions.checkNotNull(hostname); + } + + @Override + public CompletableFuture requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture updatePartitions(ExecutionAttemptID executionAttemptID, Iterable partitionInfos, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void failPartition(ExecutionAttemptID executionAttemptID) { + // noop + } + + @Override + public CompletableFuture triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void heartbeatFromJobManager(ResourceID heartbeatOrigin) { + // noop + } + + @Override + public void heartbeatFromResourceManager(ResourceID heartbeatOrigin) { + // noop + } + + @Override + public void disconnectJobManager(JobID jobId, Exception cause) { + // nooop --- End diff --
[jira] [Closed] (FLINK-7976) bump japicmp-maven-plugin version in Flink
[ https://issues.apache.org/jira/browse/FLINK-7976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-7976. --- Resolution: Duplicate > bump japicmp-maven-plugin version in Flink > -- > > Key: FLINK-7976 > URL: https://issues.apache.org/jira/browse/FLINK-7976 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.5.0 > > > bump japicmp-maven-plugin version from 0.7.0 to 0.11.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270227#comment-16270227 ] Bowen Li commented on FLINK-5544: - [~xiaogang.shi] [~srichter] what's the status of this story, guys? > Implement Internal Timer Service in RocksDB > --- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs
[ https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270123#comment-16270123 ] ASF GitHub Bot commented on FLINK-8061: --- Github user vetriselvan1187 closed the pull request at: https://github.com/apache/flink/pull/5008 > Remove trailing asterisk in QueryableStateClient javadocs > - > > Key: FLINK-8061 > URL: https://issues.apache.org/jira/browse/FLINK-8061 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeHint A {@link TypeHint} used > to extract the type of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeInfo The {@link > TypeInformation} of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5008: [FLINK-8061] [Queryable State] removed trailing as...
Github user vetriselvan1187 closed the pull request at: https://github.com/apache/flink/pull/5008 ---
[jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data
[ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270116#comment-16270116 ] ASF GitHub Bot commented on FLINK-8158: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5094 Hi @hequn8128, let me try to explain this. 1. In current implementation, the join process just relies on the cached rows instead of the watermarks. Specifically, when receiving a record, the join function will only check whether there exist qualified rows of the opposite cache in spite of the lateness. Thus if the cache ***is not cleaned up in time***, the outdated results will be emitted. 2. Strictly speaking, the value for holding back watermarks should be dynamically reported by the join function in runtime. The current implementation temporarily uses a static value (`MaxOutputDelay`) for that. In other words, the holding back value should be decided by the cached rows, rather than, the cache size should be decided by `MaxOutputDelay`. Hope that helps. Best, Xingcan > Rowtime window inner join emits late data > - > > Key: FLINK-8158 > URL: https://issues.apache.org/jira/browse/FLINK-8158 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng > Attachments: screenshot-1xxx.png > > > When executing the join, the join operator needs to make sure that no late > data is emitted. Currently, this achieved by holding back watermarks. > However, the window border is not handled correctly. For the sql bellow: > {quote} > val sqlQuery = > """ > SELECT t2.key, t2.id, t1.id > FROM T1 as t1 join T2 as t2 ON > t1.key = t2.key AND > t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND > t2.rt + INTERVAL '1' SECOND > """.stripMargin > val data1 = new mutable.MutableList[(String, String, Long)] > // for boundary test > data1.+=(("A", "LEFT1", 6000L)) > val data2 = new mutable.MutableList[(String, String, Long)] > data2.+=(("A", "RIGHT1", 6000L)) > {quote} > Join will output a watermark with timestamp 1000, but if left comes with > another data ("A", "LEFT1", 1000L), join will output a record with timestamp > 1000 which equals previous watermark. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5094: [FLINK-8158] [table] Fix rowtime window inner join emits ...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5094 Hi @hequn8128, let me try to explain this. 1. In current implementation, the join process just relies on the cached rows instead of the watermarks. Specifically, when receiving a record, the join function will only check whether there exist qualified rows of the opposite cache in spite of the lateness. Thus if the cache ***is not cleaned up in time***, the outdated results will be emitted. 2. Strictly speaking, the value for holding back watermarks should be dynamically reported by the join function in runtime. The current implementation temporarily uses a static value (`MaxOutputDelay`) for that. In other words, the holding back value should be decided by the cached rows, rather than, the cache size should be decided by `MaxOutputDelay`. Hope that helps. Best, Xingcan ---
[jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data
[ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270090#comment-16270090 ] ASF GitHub Bot commented on FLINK-8158: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5094 Hi @xccui , thanks for your reply. Feel free to take it if you wish. I still have some confusions. 1. Considering the test `testRowTimeJoinWithCommonBounds2` in `JoinHarnessTest`, do you mean the row with timestamp 1000 should not been calculated? The row does satisfy the join condition: `t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND t2.rt + INTERVAL '1' SECOND`, and this is the difference between `BETWEEN` and `NOT BETWEEN`. 2. Can't we use the holding back watermark as the boundary to cache and expire data? Any data with timestamp bigger than the holding back watermark should be cached and may be joined later. We should take any opportunity to join and produce a result that satisfy the join predicate. > Rowtime window inner join emits late data > - > > Key: FLINK-8158 > URL: https://issues.apache.org/jira/browse/FLINK-8158 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng > Attachments: screenshot-1xxx.png > > > When executing the join, the join operator needs to make sure that no late > data is emitted. Currently, this achieved by holding back watermarks. > However, the window border is not handled correctly. For the sql bellow: > {quote} > val sqlQuery = > """ > SELECT t2.key, t2.id, t1.id > FROM T1 as t1 join T2 as t2 ON > t1.key = t2.key AND > t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND > t2.rt + INTERVAL '1' SECOND > """.stripMargin > val data1 = new mutable.MutableList[(String, String, Long)] > // for boundary test > data1.+=(("A", "LEFT1", 6000L)) > val data2 = new mutable.MutableList[(String, String, Long)] > data2.+=(("A", "RIGHT1", 6000L)) > {quote} > Join will output a watermark with timestamp 1000, but if left comes with > another data ("A", "LEFT1", 1000L), join will output a record with timestamp > 1000 which equals previous watermark. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5094: [FLINK-8158] [table] Fix rowtime window inner join emits ...
Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5094 Hi @xccui , thanks for your reply. Feel free to take it if you wish. I still have some confusions. 1. Considering the test `testRowTimeJoinWithCommonBounds2` in `JoinHarnessTest`, do you mean the row with timestamp 1000 should not been calculated? The row does satisfy the join condition: `t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND t2.rt + INTERVAL '1' SECOND`, and this is the difference between `BETWEEN` and `NOT BETWEEN`. 2. Can't we use the holding back watermark as the boundary to cache and expire data? Any data with timestamp bigger than the holding back watermark should be cached and may be joined later. We should take any opportunity to join and produce a result that satisfy the join predicate. ---
[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics
[ https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269921#comment-16269921 ] Wei-Che Wei commented on FLINK-7692: [~Zentol] This idea is much better. I would like to keep working on it based on your scaffold. One more thing needs to be confirmed: I need to distinguish when to add {{GenericKeyMetricGroup}} or {{GenericMetricGroup}} if a group {{name}} doesn't exist, since it will always create {{GenericKeyMetricGroup}} in your scaffold. Am I right? > Support user-defined variables in Metrics > - > > Key: FLINK-7692 > URL: https://issues.apache.org/jira/browse/FLINK-7692 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.5.0 > > > Reporters that identify metrics with a set of key-value pairs are currently > limited to the variables defined by Flink, like the taskmanager ID, with > users not being able to supply their own. > This is inconsistent with reporters that use metric identifiers that freely > include user-defined groups constructted via {{MetricGroup#addGroup(String > name)}}. > I propose adding a new method {{MetricGroup#addGroup(String key, String > name)}} that adds a new key-value pair to the {{variables}} map in it's > constructor. When constructing the metric identifier the key should be > included as well, resulting in the same result as when constructing the > metric groups tree via {{group.addGroup(key).addGroup(value)}}. > For this a new {{KeyedGenericMetricGroup}} should be created that resembles > the unkeyed version, with slight modifications to the constructor and > {{getScopeComponents}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients
[ https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269617#comment-16269617 ] ASF GitHub Bot commented on FLINK-7574: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5076 These should generally be pretty safe changes to make IMO, and well worth it. In particular the undeclared usage of transitive dependencies (that may be shaded) frequently caused headaches, and should now be much easier to detect. We have to look out for dependencies that are used exclusively via reflection and those we have for the sake of including them in flink-dist; can't think of other special cases at the moment. > Remove unused dependencies from flink-clients > - > > Key: FLINK-7574 > URL: https://issues.apache.org/jira/browse/FLINK-7574 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.2 > Environment: Apache Maven 3.3.9, Java version: 1.8.0_144 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > > [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ > flink-clients_2.11 --- > [WARNING] Used undeclared dependencies found: > [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile > [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile > [WARNING] Unused declared dependencies found: > [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test > [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile > [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test > [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile > [WARNING]log4j:log4j:jar:1.2.17:test > [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test > [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5076: [FLINK-7574][build] POM Cleanup flink-clients
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5076 These should generally be pretty safe changes to make IMO, and well worth it. In particular the undeclared usage of transitive dependencies (that may be shaded) frequently caused headaches, and should now be much easier to detect. We have to look out for dependencies that are used exclusively via reflection and those we have for the sake of including them in flink-dist; can't think of other special cases at the moment. ---
[GitHub] flink pull request #4383: [hotfix] [optimizer] Normalize job plan operator f...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4383 ---
[jira] [Closed] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-6864. - Resolution: Implemented master: 450b4241404055ed6638e354be421b83380827c5 > Remove confusing "invalid POJO type" messages from TypeExtractor > > > Key: FLINK-6864 > URL: https://issues.apache.org/jira/browse/FLINK-6864 > Project: Flink > Issue Type: Improvement > Components: Documentation, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > Fix For: 1.5.0 > > > When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will > log warnings such as ".. must have a default constructor to be used as a > POJO.", " ... is not a valid POJO type because not all fields are valid POJO > fields." in the {{analyzePojo}} method. > These messages are often conceived as misleading for the user to think that > the job should have failed, whereas in fact in these cases Flink just > fallsback to Kryo and treat then as generic types. We should remove these > messages, and at the same time improve the type serialization docs at [1] to > explicitly inform what it means when Flink does / does not recognizes a user > type as a POJO. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269526#comment-16269526 ] ASF GitHub Bot commented on FLINK-6864: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4574 > Remove confusing "invalid POJO type" messages from TypeExtractor > > > Key: FLINK-6864 > URL: https://issues.apache.org/jira/browse/FLINK-6864 > Project: Flink > Issue Type: Improvement > Components: Documentation, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > Fix For: 1.5.0 > > > When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will > log warnings such as ".. must have a default constructor to be used as a > POJO.", " ... is not a valid POJO type because not all fields are valid POJO > fields." in the {{analyzePojo}} method. > These messages are often conceived as misleading for the user to think that > the job should have failed, whereas in fact in these cases Flink just > fallsback to Kryo and treat then as generic types. We should remove these > messages, and at the same time improve the type serialization docs at [1] to > explicitly inform what it means when Flink does / does not recognizes a user > type as a POJO. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4574: [FLINK-6864] Fix confusing "invalid POJO type" mes...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4574 ---
[jira] [Commented] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything
[ https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269508#comment-16269508 ] Chesnay Schepler commented on FLINK-6053: - We can't properly subsume the existing gauge type until the Histogram was reworked. Once that is done, we will deprecate the freestyle gauge type and introduce new String-/NumberGauge types. > Gauge should only take subclasses of Number, rather than everything > -- > > Key: FLINK-6053 > URL: https://issues.apache.org/jira/browse/FLINK-6053 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Bowen Li > Fix For: 2.0.0 > > > Currently, Flink's Gauge is defined as > ```java > public interface Gauge extends Metric { > T getValue(); > } > ``` > But it doesn't make sense to have Gauge take generic types other than Number. > And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is > only about Number. So the class should be like > ```java > public interface Gauge extends Metric { > T getValue(); > } > ``` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269491#comment-16269491 ] ASF GitHub Bot commented on FLINK-6864: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4574 @zjureel am merging this ... thanks for the PR and edits! > Remove confusing "invalid POJO type" messages from TypeExtractor > > > Key: FLINK-6864 > URL: https://issues.apache.org/jira/browse/FLINK-6864 > Project: Flink > Issue Type: Improvement > Components: Documentation, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > Fix For: 1.5.0 > > > When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will > log warnings such as ".. must have a default constructor to be used as a > POJO.", " ... is not a valid POJO type because not all fields are valid POJO > fields." in the {{analyzePojo}} method. > These messages are often conceived as misleading for the user to think that > the job should have failed, whereas in fact in these cases Flink just > fallsback to Kryo and treat then as generic types. We should remove these > messages, and at the same time improve the type serialization docs at [1] to > explicitly inform what it means when Flink does / does not recognizes a user > type as a POJO. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-6864: -- Fix Version/s: 1.5.0 > Remove confusing "invalid POJO type" messages from TypeExtractor > > > Key: FLINK-6864 > URL: https://issues.apache.org/jira/browse/FLINK-6864 > Project: Flink > Issue Type: Improvement > Components: Documentation, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > Fix For: 1.5.0 > > > When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will > log warnings such as ".. must have a default constructor to be used as a > POJO.", " ... is not a valid POJO type because not all fields are valid POJO > fields." in the {{analyzePojo}} method. > These messages are often conceived as misleading for the user to think that > the job should have failed, whereas in fact in these cases Flink just > fallsback to Kryo and treat then as generic types. We should remove these > messages, and at the same time improve the type serialization docs at [1] to > explicitly inform what it means when Flink does / does not recognizes a user > type as a POJO. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4574: [FLINK-6864] Fix confusing "invalid POJO type" messages f...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4574 @zjureel am merging this ... thanks for the PR and edits! ---
[GitHub] flink issue #4383: [hotfix] [optimizer] Normalize job plan operator formatti...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4383 @zentol @fhueske I am merging the change with the extra space since this looks to have been the original intent. I've looked at both forms without finding a strong preference. ---
[jira] [Updated] (FLINK-8164) JobManager's archiving does not work on S3
[ https://issues.apache.org/jira/browse/FLINK-8164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cristian updated FLINK-8164: Description: I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://bucket/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3, referenced in file URI 's3://bucket/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. I have also tried using s3a instead to no avail. This happens running Flink v1.3.2 on EMR. was: I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3, referenced in file URI 's3://scopely-flink-dev/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. I have also tried using s3a instead to no avail. This happens running
[jira] [Updated] (FLINK-8164) JobManager's archiving does not work on S3
[ https://issues.apache.org/jira/browse/FLINK-8164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cristian updated FLINK-8164: Description: I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3, referenced in file URI 's3://scopely-flink-dev/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. I have also tried using s3a instead to no avail. This happens running Flink v1.3.2 on EMR. was: I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3a, referenced in file URI 's3a://scopely-flink-dev/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. This happens running Flink v1.3.2 on EMR. >
[jira] [Created] (FLINK-8164) JobManager's archiving does not work on S3
Cristian created FLINK-8164: --- Summary: JobManager's archiving does not work on S3 Key: FLINK-8164 URL: https://issues.apache.org/jira/browse/FLINK-8164 Project: Flink Issue Type: Bug Components: JobManager Reporter: Cristian I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3a, referenced in file URI 's3a://scopely-flink-dev/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. This happens running Flink v1.3.2 on EMR. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4374: repalce map.put with putIfAbsent
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4374 but honestly, the previous approach just looks wrong: adding it and in case of a conflict reverting that `put` operation. What if (and I'm walking in the dark here) something already operates on the invalidly registered task (is this run in parallel?). I cannot spot any parallel use though... ---
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269114#comment-16269114 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -114,7 +116,7 @@ public void onNotification() { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269116#comment-16269116 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153563915 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java --- @@ -39,13 +39,15 @@ private final AtomicBoolean isReleased; PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) { + super(parent); + this.parent = checkNotNull(parent); this.availabilityListener = checkNotNull(listener); this.isReleased = new AtomicBoolean(); } @Override - public Buffer getNextBuffer() { + public Buffer getNextBufferInternal() { --- End diff -- make this `protected` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269115#comment-16269115 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564859 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,52 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } /** * Returns the next {@link Buffer} instance of this queue iterator. -* -* If there is currently no instance available, it will return null. +* +* If there is currently no instance available, it will return null. * This might happen for example when a pipelined queue producer is slower * than the consumer or a spilled queue needs to read in more data. -* -* Important: The consumer has to make sure that each +* +* Important: The consumer has to make sure that each * buffer instance will eventually be recycled with {@link Buffer#recycle()} * after it has been consumed. */ - Buffer getNextBuffer() throws IOException, InterruptedException; + public Buffer getNextBuffer() throws IOException, InterruptedException { + Buffer buffer = getNextBufferInternal(); + if (buffer != null) { + parent.decreaseStatistics(buffer); + } + return buffer; + } + + public int getBuffersInBacklog() { + return parent.getBuffersInBacklog(); + } - void notifyBuffersAvailable(long buffers) throws IOException; + protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException; --- End diff -- please add a javadoc with the intended relation to `getNextBuffer` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269117#comment-16269117 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564111 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java --- @@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T // - - static class InfiniteSubpartitionView implements ResultSubpartitionView { + static class InfiniteSubpartitionView extends ResultSubpartitionView { private final BufferProvider bufferProvider; private final CountDownLatch sync; public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) { + super(mock(ResultSubpartition.class)); + this.bufferProvider = checkNotNull(bufferProvider); this.sync = checkNotNull(sync); } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269118#comment-16269118 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564062 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -133,7 +135,7 @@ int releaseMemory() throws IOException { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -114,7 +116,7 @@ public void onNotification() { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564859 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,52 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } /** * Returns the next {@link Buffer} instance of this queue iterator. -* -* If there is currently no instance available, it will return null. +* +* If there is currently no instance available, it will return null. * This might happen for example when a pipelined queue producer is slower * than the consumer or a spilled queue needs to read in more data. -* -* Important: The consumer has to make sure that each +* +* Important: The consumer has to make sure that each * buffer instance will eventually be recycled with {@link Buffer#recycle()} * after it has been consumed. */ - Buffer getNextBuffer() throws IOException, InterruptedException; + public Buffer getNextBuffer() throws IOException, InterruptedException { + Buffer buffer = getNextBufferInternal(); + if (buffer != null) { + parent.decreaseStatistics(buffer); + } + return buffer; + } + + public int getBuffersInBacklog() { + return parent.getBuffersInBacklog(); + } - void notifyBuffersAvailable(long buffers) throws IOException; + protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException; --- End diff -- please add a javadoc with the intended relation to `getNextBuffer` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153563915 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java --- @@ -39,13 +39,15 @@ private final AtomicBoolean isReleased; PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) { + super(parent); + this.parent = checkNotNull(parent); this.availabilityListener = checkNotNull(listener); this.isReleased = new AtomicBoolean(); } @Override - public Buffer getNextBuffer() { + public Buffer getNextBufferInternal() { --- End diff -- make this `protected` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564111 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java --- @@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T // - - static class InfiniteSubpartitionView implements ResultSubpartitionView { + static class InfiniteSubpartitionView extends ResultSubpartitionView { private final BufferProvider bufferProvider; private final CountDownLatch sync; public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) { + super(mock(ResultSubpartition.class)); + this.bufferProvider = checkNotNull(bufferProvider); this.sync = checkNotNull(sync); } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564062 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -133,7 +135,7 @@ int releaseMemory() throws IOException { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` ---
[jira] [Commented] (FLINK-8151) [Table] removing map value equality check
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269092#comment-16269092 ] ASF GitHub Bot commented on FLINK-8151: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5070#discussion_r153566048 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -188,13 +188,6 @@ object ScalarOperators { (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)" } } -// map types -else if (isMap(left.resultType) && - left.resultType.getTypeClass == right.resultType.getTypeClass) { - generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { -(leftTerm, rightTerm) => s"java.util.Map.equals($leftTerm, $rightTerm)" --- End diff -- I tried to use this but this is an exact object match, which is very different from `Array.equals`, which is indexed element match. so there's a problem here: 1. an object referred on both the left and right hand side originally might become different object depending on the ser/deser (and they still represent the same Map); 2. if two Map objects with exact same key and value pairs, they should be considered equal, however they are not equal under object match; 3. it is also a debate on whether `Map` should be considered as ordered or unordered map. I think this worth another PR to address. > [Table] removing map value equality check > - > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Following up with FLINK-8038. The equality check is not working as Map does > not support element-wise equality. Suggest to remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8151) [Table] removing map value equality check
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269090#comment-16269090 ] ASF GitHub Bot commented on FLINK-8151: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5070#discussion_r153566038 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala --- @@ -139,13 +139,6 @@ class MapTypeTest extends MapTypeTestBase { @Test def testMapOperations(): Unit = { -// comparison -testAllApis( --- End diff -- That was my bad, I was only testing the invalid equality case ( e.g. `MAP[STRING, INT] = MAP[STRING,STRING]` should throw ValidationException), I did not put in the valid equality test :-( > [Table] removing map value equality check > - > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Following up with FLINK-8038. The equality check is not working as Map does > not support element-wise equality. Suggest to remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5070: [FLINK-8151][table]Remove Map type equality compar...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5070#discussion_r153566048 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -188,13 +188,6 @@ object ScalarOperators { (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)" } } -// map types -else if (isMap(left.resultType) && - left.resultType.getTypeClass == right.resultType.getTypeClass) { - generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { -(leftTerm, rightTerm) => s"java.util.Map.equals($leftTerm, $rightTerm)" --- End diff -- I tried to use this but this is an exact object match, which is very different from `Array.equals`, which is indexed element match. so there's a problem here: 1. an object referred on both the left and right hand side originally might become different object depending on the ser/deser (and they still represent the same Map); 2. if two Map objects with exact same key and value pairs, they should be considered equal, however they are not equal under object match; 3. it is also a debate on whether `Map` should be considered as ordered or unordered map. I think this worth another PR to address. ---
[GitHub] flink pull request #5070: [FLINK-8151][table]Remove Map type equality compar...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5070#discussion_r153566038 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala --- @@ -139,13 +139,6 @@ class MapTypeTest extends MapTypeTestBase { @Test def testMapOperations(): Unit = { -// comparison -testAllApis( --- End diff -- That was my bad, I was only testing the invalid equality case ( e.g. `MAP[STRING, INT] = MAP[STRING,STRING]` should throw ValidationException), I did not put in the valid equality test :-( ---
[jira] [Commented] (FLINK-8104) Fix Row value constructor
[ https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269082#comment-16269082 ] ASF GitHub Bot commented on FLINK-8104: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5040 Thanks @twalthr for confirming, I will also add in the documentation today then. > Fix Row value constructor > - > > Key: FLINK-8104 > URL: https://issues.apache.org/jira/browse/FLINK-8104 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Support Row value constructor which is currently broken. > See > {code:java} > // > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala > @Test > def testValueConstructorFunctions(): Unit = { > // TODO we need a special code path that flattens ROW types > // testSqlApi("ROW('hello world', 12)", "hello world") // test base only > returns field 0 > // testSqlApi("('hello world', 12)", "hello world") // test base only > returns field 0 > // ... > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5040: [FLINK-8104][Table API] fixing ROW type value constructor...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5040 Thanks @twalthr for confirming, I will also add in the documentation today then. ---
[jira] [Assigned] (FLINK-8119) Cannot submit jobs to YARN Session in FLIP-6 mode
[ https://issues.apache.org/jira/browse/FLINK-8119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-8119: Assignee: Till Rohrmann > Cannot submit jobs to YARN Session in FLIP-6 mode > - > > Key: FLINK-8119 > URL: https://issues.apache.org/jira/browse/FLINK-8119 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Cannot submit jobs to YARN Session in FLIP-6 mode because > {{FlinkYarnSessionCli}} becomes the _active_ CLI (should be > {{Flip6DefaultCLI}}). > *Steps to reproduce* > # Build Flink 1.5 {{101fef7397128b0aba23221481ab86f62b18118f}} > # {{bin/yarn-session.sh -flip6 -d -n 1 -s 1 -jm 1024 -tm 1024}} > # {{bin/flink run -flip6 ./examples/streaming/WordCount.jar}} > # Verify that the job will not run. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem
[ https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269041#comment-16269041 ] Stephan Ewen commented on FLINK-5789: - We may want to add further abstractions to the file system. The S3 API has an interesting approach, allowing you to upload multiple individual chunks (multipart upload) and then issue a separate request to commit a set of these parts. That could be used, for example, to stage/flush parts on checkpoint, but not commit (publish) until it is time to roll over the bucket. Because that is very S3 specific, it may make sense to have an abstraction for temp file, temp regions, committing those, etc. on top of the {{FileSystem}} abstraction. > Make Bucketing Sink independent of Hadoop's FileSystem > -- > > Key: FLINK-5789 > URL: https://issues.apache.org/jira/browse/FLINK-5789 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0, 1.1.4 >Reporter: Stephan Ewen > Fix For: 1.5.0 > > > The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's > file system abstraction. > This causes several issues: > - The bucketing sink will behave different than other file sinks with > respect to configuration > - Directly supported file systems (not through hadoop) like the MapR File > System does not work in the same way with the BuketingSink as other file > systems > - The previous point is all the more problematic in the effort to make > Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, > AWS, GCE, Azure) with ideally no Hadoop dependency. > We should port the {{BucketingSink}} to use Flink's FileSystem classes. > To support the *truncate* functionality that is needed for the exactly-once > semantics of the Bucketing Sink, we should extend Flink's FileSystem > abstraction to have the methods > - {{boolean supportsTruncate()}} > - {{void truncate(Path, long)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #:
Github user aljoscha commented on the pull request: https://github.com/apache/flink/commit/c940d5eff9897796625a696ed2989aed52c39ebd#commitcomment-25902242 In tools/releasing/create_source_release.sh: In tools/releasing/create_source_release.sh on line 60: could, but that's not strictly necessary anymore because I also introduced the temporary `git clone`. ---
[jira] [Commented] (FLINK-7918) Run AbstractTestBase tests on Flip-6 MiniCluster
[ https://issues.apache.org/jira/browse/FLINK-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269039#comment-16269039 ] ASF GitHub Bot commented on FLINK-7918: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5095 [FLINK-7918] Run AbstractTestBase tests on Flip-6 MiniCluster ## What is the purpose of the change Extend `MiniClusterResource` to instantiate a Flip-6 `MiniCluster` if the system property `codebase` has been set to `flip6`. This allows to run all tests which are based on `AbstractTestBase` on the `Flip-6` code base. ## Brief change log - Use system property `codebase` to distinguish between Flip-6 and old code base - Instantiate respective `MiniCluster` in `MiniClusterResource` depending on the system property ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink flip6TestBase Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5095.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5095 commit 1f6cb24a2b8e4b062fe12d756bfd5950ab35b0ce Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit aff11aaf02c7b5b667780031e01a1b08d85e036b Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 040cb326086968b5fe5d66dcc82835ee9dd092dc Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. commit 5d7e117392a51a485a5cb682c4a2c12028e73b97 Author: Till Rohrmann Date: 2017-11-24T17:06:10Z [FLINK-8088] Associate logical slots with the slot request id Before logical slots like the SimpleSlot and SharedSlot where associated to the actually allocated slot via the AllocationID. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the slot request id. commit 75f49d4bb3f38c7bfaee101607a459d558c00ba8 Author: Till Rohrmann Date: 2017-11-13T14:42:07Z [FLINK-8089] Also check for other pending slot requests in offerSlot Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots. commit b53729e74ccc31c634c5ac7db5e37ca45a66db8d Author: Till Rohrmann Date: 2017-11-24T17:08:38Z [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool This commit adds support for queued scheduling with slot sharing to the SlotPool. The idea of slot sharing is that multiple tasks can run in the same slot. Moreover, queued scheduling means that a slot request must not be completed right away but at a later point in time. This allows to start new
[GitHub] flink pull request #5095: [FLINK-7918] Run AbstractTestBase tests on Flip-6 ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5095 [FLINK-7918] Run AbstractTestBase tests on Flip-6 MiniCluster ## What is the purpose of the change Extend `MiniClusterResource` to instantiate a Flip-6 `MiniCluster` if the system property `codebase` has been set to `flip6`. This allows to run all tests which are based on `AbstractTestBase` on the `Flip-6` code base. ## Brief change log - Use system property `codebase` to distinguish between Flip-6 and old code base - Instantiate respective `MiniCluster` in `MiniClusterResource` depending on the system property ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink flip6TestBase Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5095.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5095 commit 1f6cb24a2b8e4b062fe12d756bfd5950ab35b0ce Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit aff11aaf02c7b5b667780031e01a1b08d85e036b Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 040cb326086968b5fe5d66dcc82835ee9dd092dc Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. commit 5d7e117392a51a485a5cb682c4a2c12028e73b97 Author: Till Rohrmann Date: 2017-11-24T17:06:10Z [FLINK-8088] Associate logical slots with the slot request id Before logical slots like the SimpleSlot and SharedSlot where associated to the actually allocated slot via the AllocationID. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the slot request id. commit 75f49d4bb3f38c7bfaee101607a459d558c00ba8 Author: Till Rohrmann Date: 2017-11-13T14:42:07Z [FLINK-8089] Also check for other pending slot requests in offerSlot Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots. commit b53729e74ccc31c634c5ac7db5e37ca45a66db8d Author: Till Rohrmann Date: 2017-11-24T17:08:38Z [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool This commit adds support for queued scheduling with slot sharing to the SlotPool. The idea of slot sharing is that multiple tasks can run in the same slot. Moreover, queued scheduling means that a slot request must not be completed right away but at a later point in time. This allows to start new TaskExecutors in case that there are no more slots left. The main component responsible for the management of shared slots is the SlotSharingManager. The SlotSharingManager maintains internally a tree-like structure which
[jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data
[ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268970#comment-16268970 ] ASF GitHub Bot commented on FLINK-8158: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5094 Hi @hequn8128, thanks for looking into this. I've checked the current implementation and found that it really may emit late data. However, that was caused by the checkings below: https://github.com/apache/flink/blob/427dfe42e2bea891b40e662bc97cdea57cdae3f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala#L173 and https://github.com/apache/flink/blob/427dfe42e2bea891b40e662bc97cdea57cdae3f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala#L234 In some situations, they will not forbid the late rows from being calculated and emitted. Honestly, I cannot think out a solution in a short time. Do you want to continue working on that? Or I could take it over, if you don't mind. Thanks, Xingcan > Rowtime window inner join emits late data > - > > Key: FLINK-8158 > URL: https://issues.apache.org/jira/browse/FLINK-8158 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng > Attachments: screenshot-1xxx.png > > > When executing the join, the join operator needs to make sure that no late > data is emitted. Currently, this achieved by holding back watermarks. > However, the window border is not handled correctly. For the sql bellow: > {quote} > val sqlQuery = > """ > SELECT t2.key, t2.id, t1.id > FROM T1 as t1 join T2 as t2 ON > t1.key = t2.key AND > t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND > t2.rt + INTERVAL '1' SECOND > """.stripMargin > val data1 = new mutable.MutableList[(String, String, Long)] > // for boundary test > data1.+=(("A", "LEFT1", 6000L)) > val data2 = new mutable.MutableList[(String, String, Long)] > data2.+=(("A", "RIGHT1", 6000L)) > {quote} > Join will output a watermark with timestamp 1000, but if left comes with > another data ("A", "LEFT1", 1000L), join will output a record with timestamp > 1000 which equals previous watermark. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5094: [FLINK-8158] [table] Fix rowtime window inner join emits ...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5094 Hi @hequn8128, thanks for looking into this. I've checked the current implementation and found that it really may emit late data. However, that was caused by the checkings below: https://github.com/apache/flink/blob/427dfe42e2bea891b40e662bc97cdea57cdae3f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala#L173 and https://github.com/apache/flink/blob/427dfe42e2bea891b40e662bc97cdea57cdae3f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala#L234 In some situations, they will not forbid the late rows from being calculated and emitted. Honestly, I cannot think out a solution in a short time. Do you want to continue working on that? Or I could take it over, if you don't mind. Thanks, Xingcan ---
[jira] [Commented] (FLINK-7300) End-to-end tests are instable on Travis
[ https://issues.apache.org/jira/browse/FLINK-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268948#comment-16268948 ] Till Rohrmann commented on FLINK-7300: -- Another instance of instable Kafka end-to-end tests: https://travis-ci.org/apache/flink/jobs/308347992 > End-to-end tests are instable on Travis > --- > > Key: FLINK-7300 > URL: https://issues.apache.org/jira/browse/FLINK-7300 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek > Labels: test-stability > > It seems like the end-to-end tests are instable, causing the {{misc}} build > profile to sporadically fail. > Incorrect matched output: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30=20170731T060526Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8 > Another failure example of a different cause then the above, also on the > end-to-end tests: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30=20170731T060007Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
[ https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268907#comment-16268907 ] ASF GitHub Bot commented on FLINK-8150: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153524341 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java --- @@ -29,16 +30,16 @@ /** * Json serializer for {@link InstanceID}. */ -public class InstanceIDSerializer extends StdSerializer { +public class ResourceIDSerializer extends StdSerializer { private static final long serialVersionUID = 5798852092159615938L; - protected InstanceIDSerializer() { - super(InstanceID.class); + protected ResourceIDSerializer() { + super(ResourceID.class); } @Override - public void serialize(InstanceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { + public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { gen.writeString(value.toString()); --- End diff -- Good point. Fixed with your commit. > WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs > - > > Key: FLINK-8150 > URL: https://issues.apache.org/jira/browse/FLINK-8150 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > TaskManager IDs exposed by > {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} > cannot be used as input to query TaskManager metrics with method > {{MetricStore#getTaskManagerMetricStore(String)}}. > *Reason* > {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} > s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} > s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} > returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
[ https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268908#comment-16268908 ] ASF GitHub Bot commented on FLINK-8150: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5093 Thanks for the review @GJL. > WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs > - > > Key: FLINK-8150 > URL: https://issues.apache.org/jira/browse/FLINK-8150 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > TaskManager IDs exposed by > {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} > cannot be used as input to query TaskManager metrics with method > {{MetricStore#getTaskManagerMetricStore(String)}}. > *Reason* > {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} > s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} > s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} > returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5093: [FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5093 Thanks for the review @GJL. ---
[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153524341 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java --- @@ -29,16 +30,16 @@ /** * Json serializer for {@link InstanceID}. */ -public class InstanceIDSerializer extends StdSerializer { +public class ResourceIDSerializer extends StdSerializer { private static final long serialVersionUID = 5798852092159615938L; - protected InstanceIDSerializer() { - super(InstanceID.class); + protected ResourceIDSerializer() { + super(ResourceID.class); } @Override - public void serialize(InstanceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { + public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { gen.writeString(value.toString()); --- End diff -- Good point. Fixed with your commit. ---
[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
[ https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268905#comment-16268905 ] ASF GitHub Bot commented on FLINK-8150: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153524087 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java --- @@ -53,8 +53,8 @@ public static final String FIELD_NAME_HARDWARE = "hardware"; @JsonProperty(FIELD_NAME_INSTANCE_ID) --- End diff -- Good point. Fixed with your commit. > WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs > - > > Key: FLINK-8150 > URL: https://issues.apache.org/jira/browse/FLINK-8150 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > TaskManager IDs exposed by > {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} > cannot be used as input to query TaskManager metrics with method > {{MetricStore#getTaskManagerMetricStore(String)}}. > *Reason* > {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} > s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} > s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} > returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
[ https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268903#comment-16268903 ] ASF GitHub Bot commented on FLINK-8150: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153524037 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java --- @@ -43,7 +43,7 @@ @JsonCreator public TaskManagerDetailsInfo( - @JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId, + @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId, --- End diff -- Good point. Fixed with your commit. > WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs > - > > Key: FLINK-8150 > URL: https://issues.apache.org/jira/browse/FLINK-8150 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > TaskManager IDs exposed by > {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} > cannot be used as input to query TaskManager metrics with method > {{MetricStore#getTaskManagerMetricStore(String)}}. > *Reason* > {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} > s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} > s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} > returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153524087 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java --- @@ -53,8 +53,8 @@ public static final String FIELD_NAME_HARDWARE = "hardware"; @JsonProperty(FIELD_NAME_INSTANCE_ID) --- End diff -- Good point. Fixed with your commit. ---
[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153524037 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java --- @@ -43,7 +43,7 @@ @JsonCreator public TaskManagerDetailsInfo( - @JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId, + @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId, --- End diff -- Good point. Fixed with your commit. ---
[GitHub] flink issue #4374: repalce map.put with putIfAbsent
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4374 @RebornHuan although this change looks to be correct and makes good use of the newer API, there is a trade-off between deleting a line of code called during an error condition and the risk of such small refactorings. I don't think we have a demonstrated benefit to making this change. ---
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268878#comment-16268878 ] ASF GitHub Bot commented on FLINK-7608: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4826 So here's the thing: The port of the metric itself is good, and exactly what i want . What I'm unsatisfied with is the naming of the metric, but that isn't the fault of this PR but a limitation in the metric system. The way the metric is named is inconsistent. The source task is encoded in the metric name with it's ID, but the target task is encoded in the scope as either ID or name, based on the scope format. (maybe reverse the order but you get the idea i hope). The conclusion i arrived at is that this metric should not be registered at the task level (which just doesn't support the notion of a metric describing 2 tasks) but at the job level instead (where we can do whatever we want in regards to tasks). This would, in principal, allow us to have this identifier: ```myjob.latency.source.ABCDE.target.DEFGH.latency_p95``` But this still isn't satisfactory, because it is still hardly usable for key-value reporters, where we want to filter latencies based on the source or target, for which we need something like this: ``` logical scope: job.task.latency.latency_p95: tags: job_name = myjob source = ABCDE target = DEFGH ``` For this to work however we first have to implement FLINK-7692 to support custom key-value pairs. As such I would like to delay merging this PR by a week or 2 until the aforementioned feature is implemented, so we don't iterate through 3 different versions. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4826: [FLINK-7608][metric] Refactor latency statistics metric
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4826 So here's the thing: The port of the metric itself is good, and exactly what i want ð . What I'm unsatisfied with is the naming of the metric, but that isn't the fault of this PR but a limitation in the metric system. The way the metric is named is inconsistent. The source task is encoded in the metric name with it's ID, but the target task is encoded in the scope as either ID or name, based on the scope format. (maybe reverse the order but you get the idea i hope). The conclusion i arrived at is that this metric should not be registered at the task level (which just doesn't support the notion of a metric describing 2 tasks) but at the job level instead (where we can do whatever we want in regards to tasks). This would, in principal, allow us to have this identifier: ```myjob.latency.source.ABCDE.target.DEFGH.latency_p95``` But this still isn't satisfactory, because it is still hardly usable for key-value reporters, where we want to filter latencies based on the source or target, for which we need something like this: ``` logical scope: job.task.latency.latency_p95: tags: job_name = myjob source = ABCDE target = DEFGH ``` For this to work however we first have to implement FLINK-7692 to support custom key-value pairs. As such I would like to delay merging this PR by a week or 2 until the aforementioned feature is implemented, so we don't iterate through 3 different versions. ---
[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics
[ https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268863#comment-16268863 ] Chesnay Schepler commented on FLINK-7692: - Let me know what you think, and whether you would like to continue working on this. > Support user-defined variables in Metrics > - > > Key: FLINK-7692 > URL: https://issues.apache.org/jira/browse/FLINK-7692 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.5.0 > > > Reporters that identify metrics with a set of key-value pairs are currently > limited to the variables defined by Flink, like the taskmanager ID, with > users not being able to supply their own. > This is inconsistent with reporters that use metric identifiers that freely > include user-defined groups constructted via {{MetricGroup#addGroup(String > name)}}. > I propose adding a new method {{MetricGroup#addGroup(String key, String > name)}} that adds a new key-value pair to the {{variables}} map in it's > constructor. When constructing the metric identifier the key should be > included as well, resulting in the same result as when constructing the > metric groups tree via {{group.addGroup(key).addGroup(value)}}. > For this a new {{KeyedGenericMetricGroup}} should be created that resembles > the unkeyed version, with slight modifications to the constructor and > {{getScopeComponents}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics
[ https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268861#comment-16268861 ] Chesnay Schepler commented on FLINK-7692: - [~tonywei] Those are good points, luckily we can resolve them in a rather nice way. We have to make one change to the original specification though: Create 2 groups for key-value pairs, instead of one. For a KeyValue group, the {{name}} on it's own behaves the same way as a generic group with the same name; it is part of the identifier and logical scope. Only the value is treated differently, as it isn't part of the logical scope. I've create a scaffold implementation (that probably doesn't fully compile... ) here: https://github.com/zentol/flink/tree/7692 This still needs tests, documentation, integration into reporters and we have to go through all usages of {{addGroup}} to find cases we can now model better. We need a new class for representing the key group, that behaves pretty much like a generic metric group: {code} public class GenericKeyMetricGroup extends GenericMetricGroup { GenericKeyMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { super(registry, parent, name); } @Override protected GenericMetricGroup createChildGroup(String name) { return new GenericValueMetricGroup(registry, this, name); } } {code} and a another class for representing the value, that a) modifies the variables and b) is excluded from the logical scope {code} public class GenericValueMetricGroup extends GenericMetricGroup { private String key; private final String value; GenericValueMetricGroup(MetricRegistry registry, GenericKeyMetricGroup parent, String value) { super(registry, parent, value); this.key = parent.getGroupName(name -> name); this.value = value; } // @Override protected void putVariables(Mapvariables) { variables.put(ScopeFormat.asVariable(this.key), value); } @Override public String getLogicalScope(CharacterFilter filter, char delimiter) { return parent.getLogicalScope(filter, delimiter); } } {code} This works with (surprisingly) little changes to existing classes; just 2 hooks to add more variables and to create a different kind of generic child group. With this, the implementation of {{addGroup(key, value)}} is trivial: {code} public MetricGroup addGroup(String key, String value) { return addGroup(key).addGroup(value); } {code} Now, as to the issues you raised:. When calling {{addGroup(name, value)}}, * if a group {{name}} already exists, we just continue with whatever group was there. If the existing group is a key group we get the behavior we want, otherwise we will add another {{GenericMetricGroup}}. This may result in the value not being exposed as a proper key-value pair, but it doesn't lead to loss of information (which we would have if we skipped registering the value). In any case the metric identifier is the same. * if a group {{value}} already exists we don't have a problem. Given that the key group is never exposed to the outside this case can only occur upon subsequent calls of {{addGroup(name, value)}}, making the latter calls a no-op, as is the existing behavior. > Support user-defined variables in Metrics > - > > Key: FLINK-7692 > URL: https://issues.apache.org/jira/browse/FLINK-7692 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.5.0 > > > Reporters that identify metrics with a set of key-value pairs are currently > limited to the variables defined by Flink, like the taskmanager ID, with > users not being able to supply their own. > This is inconsistent with reporters that use metric identifiers that freely > include user-defined groups constructted via {{MetricGroup#addGroup(String > name)}}. > I propose adding a new method {{MetricGroup#addGroup(String key, String > name)}} that adds a new key-value pair to the {{variables}} map in it's > constructor. When constructing the metric identifier the key should be > included as well, resulting in the same result as when constructing the > metric groups tree via {{group.addGroup(key).addGroup(value)}}. > For this a new {{KeyedGenericMetricGroup}} should be created that resembles > the unkeyed version, with slight modifications to the constructor and > {{getScopeComponents}} method. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients
[ https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268841#comment-16268841 ] ASF GitHub Bot commented on FLINK-7574: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5076#discussion_r153511839 --- Diff: pom.xml --- @@ -891,6 +905,41 @@ under the License. + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.2 + + + analyze + + analyze-only + + + + true + true + true + --- End diff -- I would keep logging, junit, jsr305 and force-shading in the parent pom as they are used by virtually all modules and would just clutter up other poms. But I wouldn't mind moving the mocking/hamcrest dependencies to child poms. > Remove unused dependencies from flink-clients > - > > Key: FLINK-7574 > URL: https://issues.apache.org/jira/browse/FLINK-7574 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.2 > Environment: Apache Maven 3.3.9, Java version: 1.8.0_144 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > > [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ > flink-clients_2.11 --- > [WARNING] Used undeclared dependencies found: > [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile > [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile > [WARNING] Unused declared dependencies found: > [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test > [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile > [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test > [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile > [WARNING]log4j:log4j:jar:1.2.17:test > [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test > [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5076: [FLINK-7574][build] POM Cleanup flink-clients
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5076#discussion_r153511839 --- Diff: pom.xml --- @@ -891,6 +905,41 @@ under the License. + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.2 + + + analyze + + analyze-only + + + + true + true + true + --- End diff -- I would keep logging, junit, jsr305 and force-shading in the parent pom as they are used by virtually all modules and would just clutter up other poms. But I wouldn't mind moving the mocking/hamcrest dependencies to child poms. ---
[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients
[ https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268805#comment-16268805 ] ASF GitHub Bot commented on FLINK-7574: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5076#discussion_r153503257 --- Diff: pom.xml --- @@ -891,6 +905,41 @@ under the License. + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.2 + + + analyze + + analyze-only + + + + true + true + true + --- End diff -- Would it be better to move the dependencies out of the parent pom? > Remove unused dependencies from flink-clients > - > > Key: FLINK-7574 > URL: https://issues.apache.org/jira/browse/FLINK-7574 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.2 > Environment: Apache Maven 3.3.9, Java version: 1.8.0_144 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > > [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ > flink-clients_2.11 --- > [WARNING] Used undeclared dependencies found: > [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile > [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile > [WARNING] Unused declared dependencies found: > [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test > [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile > [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test > [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile > [WARNING]log4j:log4j:jar:1.2.17:test > [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test > [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients
[ https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268804#comment-16268804 ] ASF GitHub Bot commented on FLINK-7574: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5076#discussion_r153500167 --- Diff: pom.xml --- @@ -891,6 +905,41 @@ under the License. + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.2 --- End diff -- Redundant from the `` section. > Remove unused dependencies from flink-clients > - > > Key: FLINK-7574 > URL: https://issues.apache.org/jira/browse/FLINK-7574 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.2 > Environment: Apache Maven 3.3.9, Java version: 1.8.0_144 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > > [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ > flink-clients_2.11 --- > [WARNING] Used undeclared dependencies found: > [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile > [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile > [WARNING] Unused declared dependencies found: > [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test > [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile > [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test > [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile > [WARNING]log4j:log4j:jar:1.2.17:test > [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test > [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5076: [FLINK-7574][build] POM Cleanup flink-clients
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5076#discussion_r153503257 --- Diff: pom.xml --- @@ -891,6 +905,41 @@ under the License. + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.2 + + + analyze + + analyze-only + + + + true + true + true + --- End diff -- Would it be better to move the dependencies out of the parent pom? ---
[GitHub] flink pull request #5076: [FLINK-7574][build] POM Cleanup flink-clients
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5076#discussion_r153500167 --- Diff: pom.xml --- @@ -891,6 +905,41 @@ under the License. + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.2 --- End diff -- Redundant from the `` section. ---
[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268760#comment-16268760 ] ASF GitHub Bot commented on FLINK-7652: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4734 Hi @tillrohrmann, this PR is now rebased to the latest master, and reworked to incorporate your last comments. > Port CurrentJobIdsHandler to new REST endpoint > -- > > Key: FLINK-7652 > URL: https://issues.apache.org/jira/browse/FLINK-7652 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.5.0 > > > Port existing {{CurrentJobIdsHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4734 Hi @tillrohrmann, this PR is now rebased to the latest master, and reworked to incorporate your last comments. ---
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268738#comment-16268738 ] ASF GitHub Bot commented on FLINK-7873: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153495140 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle; +import org.apache.flink.runtime.checkpoint.CheckpointCache; +import org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * {@link CachedCheckpointStreamFactory} is used to build an output stream that writes data to both remote end (e.g:DFS) and local end. + * Local data is managed by {@link CheckpointCache}. It simply wraps {@link CheckpointCache} and {@link CheckpointStreamFactory} and + * create a hybrid output stream by {@link CheckpointCache} and {@link CheckpointStreamFactory}, this hybrid output stream will write + * to both remote end and local end. + */ +public class CachedCheckpointStreamFactory implements CheckpointStreamFactory { + + private static Logger LOG = LoggerFactory.getLogger(CachedCheckpointStreamFactory.class); + + private final CheckpointCache cache; + private final CheckpointStreamFactory remoteFactory; + + public CachedCheckpointStreamFactory(CheckpointCache cache, CheckpointStreamFactory factory) { + this.cache = cache; + this.remoteFactory = Preconditions.checkNotNull(factory, "Remote stream factory is null."); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID) throws Exception { + return createCheckpointStateOutputStream(checkpointID, timestamp, handleID, false); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID, boolean placeholder) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("create cache output stream: cpkID:{} placeHolder:{}", checkpointID, placeholder); + } + CachedOutputStream cachedOut = null; + if (cache != null) { + cachedOut = cache.createOutputStream(checkpointID, handleID, placeholder); + } + CheckpointStateOutputStream remoteOut = null; + if (!placeholder) { + remoteOut = remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + CachedCheckpointStateOutputStream output = new CachedCheckpointStateOutputStream(cachedOut, remoteOut); + return output; + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + LOG.warn("create output stream which is not cacheable."); + return remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + + @Override + public void close() throws Exception { + remoteFactory.close(); + } + + /** +* A hybrid checkpoint output stream which write data to both remote end and local end, +* writing data locally failed won't stop writing to remote. This hybrid output stream +* will return a {@link CachedStreamStateHandle} in closeAndGetHandle(), it can be used for read data locally. +*/ + public
[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153495140 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle; +import org.apache.flink.runtime.checkpoint.CheckpointCache; +import org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * {@link CachedCheckpointStreamFactory} is used to build an output stream that writes data to both remote end (e.g:DFS) and local end. + * Local data is managed by {@link CheckpointCache}. It simply wraps {@link CheckpointCache} and {@link CheckpointStreamFactory} and + * create a hybrid output stream by {@link CheckpointCache} and {@link CheckpointStreamFactory}, this hybrid output stream will write + * to both remote end and local end. + */ +public class CachedCheckpointStreamFactory implements CheckpointStreamFactory { + + private static Logger LOG = LoggerFactory.getLogger(CachedCheckpointStreamFactory.class); + + private final CheckpointCache cache; + private final CheckpointStreamFactory remoteFactory; + + public CachedCheckpointStreamFactory(CheckpointCache cache, CheckpointStreamFactory factory) { + this.cache = cache; + this.remoteFactory = Preconditions.checkNotNull(factory, "Remote stream factory is null."); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID) throws Exception { + return createCheckpointStateOutputStream(checkpointID, timestamp, handleID, false); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID, boolean placeholder) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("create cache output stream: cpkID:{} placeHolder:{}", checkpointID, placeholder); + } + CachedOutputStream cachedOut = null; + if (cache != null) { + cachedOut = cache.createOutputStream(checkpointID, handleID, placeholder); + } + CheckpointStateOutputStream remoteOut = null; + if (!placeholder) { + remoteOut = remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + CachedCheckpointStateOutputStream output = new CachedCheckpointStateOutputStream(cachedOut, remoteOut); + return output; + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + LOG.warn("create output stream which is not cacheable."); + return remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + + @Override + public void close() throws Exception { + remoteFactory.close(); + } + + /** +* A hybrid checkpoint output stream which write data to both remote end and local end, +* writing data locally failed won't stop writing to remote. This hybrid output stream +* will return a {@link CachedStreamStateHandle} in closeAndGetHandle(), it can be used for read data locally. +*/ + public static class CachedCheckpointStateOutputStream extends CheckpointStateOutputStream { + + private CachedOutputStream cacheOut = null; + private CheckpointStateOutputStream remoteOut = null; + +
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268733#comment-16268733 ] ASF GitHub Bot commented on FLINK-7873: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153493925 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle; +import org.apache.flink.runtime.checkpoint.CheckpointCache; +import org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * {@link CachedCheckpointStreamFactory} is used to build an output stream that writes data to both remote end (e.g:DFS) and local end. + * Local data is managed by {@link CheckpointCache}. It simply wraps {@link CheckpointCache} and {@link CheckpointStreamFactory} and + * create a hybrid output stream by {@link CheckpointCache} and {@link CheckpointStreamFactory}, this hybrid output stream will write + * to both remote end and local end. + */ +public class CachedCheckpointStreamFactory implements CheckpointStreamFactory { + + private static Logger LOG = LoggerFactory.getLogger(CachedCheckpointStreamFactory.class); + + private final CheckpointCache cache; + private final CheckpointStreamFactory remoteFactory; + + public CachedCheckpointStreamFactory(CheckpointCache cache, CheckpointStreamFactory factory) { + this.cache = cache; + this.remoteFactory = Preconditions.checkNotNull(factory, "Remote stream factory is null."); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID) throws Exception { + return createCheckpointStateOutputStream(checkpointID, timestamp, handleID, false); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID, boolean placeholder) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("create cache output stream: cpkID:{} placeHolder:{}", checkpointID, placeholder); + } + CachedOutputStream cachedOut = null; + if (cache != null) { + cachedOut = cache.createOutputStream(checkpointID, handleID, placeholder); + } + CheckpointStateOutputStream remoteOut = null; + if (!placeholder) { + remoteOut = remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + CachedCheckpointStateOutputStream output = new CachedCheckpointStateOutputStream(cachedOut, remoteOut); + return output; + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + LOG.warn("create output stream which is not cacheable."); + return remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + + @Override + public void close() throws Exception { + remoteFactory.close(); + } + + /** +* A hybrid checkpoint output stream which write data to both remote end and local end, +* writing data locally failed won't stop writing to remote. This hybrid output stream +* will return a {@link CachedStreamStateHandle} in closeAndGetHandle(), it can be used for read data locally. +*/ + public
[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153493925 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle; +import org.apache.flink.runtime.checkpoint.CheckpointCache; +import org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * {@link CachedCheckpointStreamFactory} is used to build an output stream that writes data to both remote end (e.g:DFS) and local end. + * Local data is managed by {@link CheckpointCache}. It simply wraps {@link CheckpointCache} and {@link CheckpointStreamFactory} and + * create a hybrid output stream by {@link CheckpointCache} and {@link CheckpointStreamFactory}, this hybrid output stream will write + * to both remote end and local end. + */ +public class CachedCheckpointStreamFactory implements CheckpointStreamFactory { + + private static Logger LOG = LoggerFactory.getLogger(CachedCheckpointStreamFactory.class); + + private final CheckpointCache cache; + private final CheckpointStreamFactory remoteFactory; + + public CachedCheckpointStreamFactory(CheckpointCache cache, CheckpointStreamFactory factory) { + this.cache = cache; + this.remoteFactory = Preconditions.checkNotNull(factory, "Remote stream factory is null."); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID) throws Exception { + return createCheckpointStateOutputStream(checkpointID, timestamp, handleID, false); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID, boolean placeholder) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("create cache output stream: cpkID:{} placeHolder:{}", checkpointID, placeholder); + } + CachedOutputStream cachedOut = null; + if (cache != null) { + cachedOut = cache.createOutputStream(checkpointID, handleID, placeholder); + } + CheckpointStateOutputStream remoteOut = null; + if (!placeholder) { + remoteOut = remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + CachedCheckpointStateOutputStream output = new CachedCheckpointStateOutputStream(cachedOut, remoteOut); + return output; + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + LOG.warn("create output stream which is not cacheable."); + return remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + + @Override + public void close() throws Exception { + remoteFactory.close(); + } + + /** +* A hybrid checkpoint output stream which write data to both remote end and local end, +* writing data locally failed won't stop writing to remote. This hybrid output stream +* will return a {@link CachedStreamStateHandle} in closeAndGetHandle(), it can be used for read data locally. +*/ + public static class CachedCheckpointStateOutputStream extends CheckpointStateOutputStream { + + private CachedOutputStream cacheOut = null; + private CheckpointStateOutputStream remoteOut = null; + +
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268730#comment-16268730 ] ASF GitHub Bot commented on FLINK-7873: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153493396 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -190,6 +199,11 @@ public static TaskManagerServices fromConfiguration( final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); + final CheckpointCacheManager checkpointCacheManager = new CheckpointCacheManager( + new ScheduledThreadPoolExecutor(1), + Executors.directExecutor(), + taskManagerServicesConfiguration.getTmpDirPaths()[0]); --- End diff -- You're right. I even thought we should add a configuration for local recovery to store the data. > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwith, and obtain a faster > recover. > Solution: > TaskManager do the cache job and manage the cached data itself. It simple > use a TTL-like method to manage cache entry's dispose, we dispose a entry if > it wasn't be touched for a X time, once we touch a entry we reset the TTL for > it. In this way, all jobs is done by TaskManager, it transparent to > JobManager. The only problem is that we may dispose a entry that maybe > useful, in this case, we have to read from remote data finally, but users can > avoid this by set a proper TTL value according to checkpoint interval and > other things. > Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153493396 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -190,6 +199,11 @@ public static TaskManagerServices fromConfiguration( final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); + final CheckpointCacheManager checkpointCacheManager = new CheckpointCacheManager( + new ScheduledThreadPoolExecutor(1), + Executors.directExecutor(), + taskManagerServicesConfiguration.getTmpDirPaths()[0]); --- End diff -- You're right. I even thought we should add a configuration for local recovery to store the data. ---
[GitHub] flink issue #4666: [FLINK-7613][Documentation] Fixed typographical error
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4666 @raymondtay are we still looking to make this change? Defining the mapper may be just as likely to confuse new Flink users. Also, when updating future PRs you want to rebase to master rather than merging master. ---
[jira] [Commented] (FLINK-7613) Fix documentation error in QuickStart
[ https://issues.apache.org/jira/browse/FLINK-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268728#comment-16268728 ] ASF GitHub Bot commented on FLINK-7613: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4666 @raymondtay are we still looking to make this change? Defining the mapper may be just as likely to confuse new Flink users. Also, when updating future PRs you want to rebase to master rather than merging master. > Fix documentation error in QuickStart > - > > Key: FLINK-7613 > URL: https://issues.apache.org/jira/browse/FLINK-7613 > Project: Flink > Issue Type: Task > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Raymond Tay >Priority: Minor > > In the `QuickStart => Run The Example` section, there's a typographical error > which points the reader to `*jobmanager* but it should be `*taskmanager*` in > Apache Flink 1.4.x. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268727#comment-16268727 ] ASF GitHub Bot commented on FLINK-7873: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153492995 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle( byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); + } else if (stateHandle instanceof CachedStreamStateHandle) { --- End diff -- Yes, this can be avoid by map local state to checkpoint ids. In fact, it can also be avoid when map local state to handle id, but it needs some additional code. > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwith, and obtain a faster > recover. > Solution: > TaskManager do the cache job and manage the cached data itself. It simple > use a TTL-like method to manage cache entry's dispose, we dispose a entry if > it wasn't be touched for a X time, once we touch a entry we reset the TTL for > it. In this way, all jobs is done by TaskManager, it transparent to > JobManager. The only problem is that we may dispose a entry that maybe > useful, in this case, we have to read from remote data finally, but users can > avoid this by set a proper TTL value according to checkpoint interval and > other things. > Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153492995 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle( byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); + } else if (stateHandle instanceof CachedStreamStateHandle) { --- End diff -- Yes, this can be avoid by map local state to checkpoint ids. In fact, it can also be avoid when map local state to handle id, but it needs some additional code. ---
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268724#comment-16268724 ] ASF GitHub Bot commented on FLINK-7873: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153492082 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle( byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); + } else if (stateHandle instanceof CachedStreamStateHandle) { --- End diff -- @StefanRRichter Thanks very much for reviewing my code and Tanks very much for your so detailed expression of your opinion, very happy can be similar to what you think in some places, there are two things I want to explain a bit: 1. About the 1:1 relationship between remote handle and local handle, In fact, I think each local state handle corresponds to a smallest storage unit of a checkpoint. For example, each Backend will generates a `IncrementalKeyedStateHandle` for every increment checkpoint, but `IncrementalKeyedStateHandle` is a composite handle, it contains a collect of sub StateHandle to stores data (meta & sst & misc), in this case the sub StateHanlde is the smallest storage unit and each of them have 1:1 relationship with local state handle and `IncrementalKeyedStateHandle` has 1:N relationship with local state handle(Now, CheckpointStateOutputStream.closeAndGet () returns a remote handle, which I viewed as the smallest storage unit). For incremental checkpoint, it can be optimized indeed, we can provide a green path for it to put cache entry into checkpoint cache, it doesn't need to write data locally when Transmitting data to remote end. I didn't do that because I wanted to provide a unified way to meet up all Backends requirements and I didn't want to change the code of Backend so much. 2. The local handle can be not only a local file, it can also be stored in memory, or other storage medium, or even just a mock (it may apply to CopyOnWriteStateTableSnapshot's problem describe above) as long as inherit CachedStateHandle and implement corresponding classes. IMO map local state to checkpoint id can also work, but I have ome minor questions about that: 1. Can we provide a unified local state way to meet all of the current state backend requirements (of course, the RocksDB can be optimized)? 2. Since the local state is mapped according to the checkpoint id, the key range detection needs to be performed locally again, which is a bit repetitive, can this be avoided with the work on JM. Although I've expressed my ideas, but I think you are more professional than me in this area and your thought should be better than mine. So if you have any planned issues, I would like to close this PR and turn to work on your planned issues, it seems that even thought this PR has some ideas which are similar to yours, but it seem not the base version you expected. But currently, we will still use this version of local checkpoint (it still need addressed some problem as your comments) for production, because the flink 1.4 does not have this feature and we need it very much (Our state size is very huge), With 1.5 release, we will switch and use the community version. > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwith, and obtain a faster > recover. > Solution: > TaskManager do the cache job and manage the cached data itself. It simple > use a TTL-like method to manage cache entry's dispose,
[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153492082 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle( byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); + } else if (stateHandle instanceof CachedStreamStateHandle) { --- End diff -- @StefanRRichter Thanks very much for reviewing my code and Tanks very much for your so detailed expression of your opinion, very happy can be similar to what you think in some places, there are two things I want to explain a bit: 1. About the 1:1 relationship between remote handle and local handle, In fact, I think each local state handle corresponds to a smallest storage unit of a checkpoint. For example, each Backend will generates a `IncrementalKeyedStateHandle` for every increment checkpoint, but `IncrementalKeyedStateHandle` is a composite handle, it contains a collect of sub StateHandle to stores data (meta & sst & misc), in this case the sub StateHanlde is the smallest storage unit and each of them have 1:1 relationship with local state handle and `IncrementalKeyedStateHandle` has 1:N relationship with local state handleï¼Now, CheckpointStateOutputStream.closeAndGet () returns a remote handle, which I viewed as the smallest storage unit). For incremental checkpoint, it can be optimized indeed, we can provide a green path for it to put cache entry into checkpoint cache, it doesn't need to write data locally when Transmitting data to remote end. I didn't do that because I wanted to provide a unified way to meet up all Backends requirements and I didn't want to change the code of Backend so much. 2. The local handle can be not only a local file, it can also be stored in memory, or other storage medium, or even just a mock (it may apply to CopyOnWriteStateTableSnapshot's problem describe above) as long as inherit CachedStateHandle and implement corresponding classes. IMO map local state to checkpoint id can also work, but I have ome minor questions about that: 1. Can we provide a unified local state way to meet all of the current state backend requirements (of course, the RocksDB can be optimized)? 2. Since the local state is mapped according to the checkpoint id, the key range detection needs to be performed locally again, which is a bit repetitive, can this be avoided with the work on JM. Although I've expressed my ideas, but I think you are more professional than me in this area and your thought should be better than mine. So if you have any planned issues, I would like to close this PR and turn to work on your planned issues, it seems that even thought this PR has some ideas which are similar to yours, but it seem not the base version you expected. But currently, we will still use this version of local checkpoint (it still need addressed some problem as your comments) for production, because the flink 1.4 does not have this feature and we need it very much (Our state size is very huge), With 1.5 release, we will switch and use the community version. ---
[jira] [Assigned] (FLINK-7805) Add HA capabilities to YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-7805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-7805: --- Assignee: Gary Yao > Add HA capabilities to YarnResourceManager > -- > > Key: FLINK-7805 > URL: https://issues.apache.org/jira/browse/FLINK-7805 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > > The new {{YarnResourceManager}} implementation does not retrieve allocated > containers from previous attempts in HA mode like the old > {{YarnFlinkResourceManager}} did. We should add this functionality in order > to properly support long running Yarn applications [1]. > [1] > https://de.hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
[ https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268686#comment-16268686 ] ASF GitHub Bot commented on FLINK-8150: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153483875 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java --- @@ -29,16 +30,16 @@ /** * Json serializer for {@link InstanceID}. */ -public class InstanceIDSerializer extends StdSerializer { +public class ResourceIDSerializer extends StdSerializer { private static final long serialVersionUID = 5798852092159615938L; - protected InstanceIDSerializer() { - super(InstanceID.class); + protected ResourceIDSerializer() { + super(ResourceID.class); } @Override - public void serialize(InstanceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { + public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { gen.writeString(value.toString()); --- End diff -- Maybe `value.getResourceIdString()`. > WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs > - > > Key: FLINK-8150 > URL: https://issues.apache.org/jira/browse/FLINK-8150 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > TaskManager IDs exposed by > {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} > cannot be used as input to query TaskManager metrics with method > {{MetricStore#getTaskManagerMetricStore(String)}}. > *Reason* > {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} > s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} > s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} > returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153483875 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java --- @@ -29,16 +30,16 @@ /** * Json serializer for {@link InstanceID}. */ -public class InstanceIDSerializer extends StdSerializer { +public class ResourceIDSerializer extends StdSerializer { private static final long serialVersionUID = 5798852092159615938L; - protected InstanceIDSerializer() { - super(InstanceID.class); + protected ResourceIDSerializer() { + super(ResourceID.class); } @Override - public void serialize(InstanceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { + public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException { gen.writeString(value.toString()); --- End diff -- Maybe `value.getResourceIdString()`. ---
[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
[ https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268681#comment-16268681 ] ASF GitHub Bot commented on FLINK-8150: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153483098 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java --- @@ -53,8 +53,8 @@ public static final String FIELD_NAME_HARDWARE = "hardware"; @JsonProperty(FIELD_NAME_INSTANCE_ID) --- End diff -- Should be `FIELD_NAME_RESOURCE_ID` > WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs > - > > Key: FLINK-8150 > URL: https://issues.apache.org/jira/browse/FLINK-8150 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > TaskManager IDs exposed by > {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} > cannot be used as input to query TaskManager metrics with method > {{MetricStore#getTaskManagerMetricStore(String)}}. > *Reason* > {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} > s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} > s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} > returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153483098 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java --- @@ -53,8 +53,8 @@ public static final String FIELD_NAME_HARDWARE = "hardware"; @JsonProperty(FIELD_NAME_INSTANCE_ID) --- End diff -- Should be `FIELD_NAME_RESOURCE_ID` ---
[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153482920 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java --- @@ -43,7 +43,7 @@ @JsonCreator public TaskManagerDetailsInfo( - @JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId, + @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId, --- End diff -- This should be named `FIELD_NAME_RESOURCE_ID`. ---
[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
[ https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268680#comment-16268680 ] ASF GitHub Bot commented on FLINK-8150: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5093#discussion_r153482920 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java --- @@ -43,7 +43,7 @@ @JsonCreator public TaskManagerDetailsInfo( - @JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId, + @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId, --- End diff -- This should be named `FIELD_NAME_RESOURCE_ID`. > WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs > - > > Key: FLINK-8150 > URL: https://issues.apache.org/jira/browse/FLINK-8150 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > TaskManager IDs exposed by > {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} > cannot be used as input to query TaskManager metrics with method > {{MetricStore#getTaskManagerMetricStore(String)}}. > *Reason* > {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} > s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} > s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} > returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4504: [FLINK-7395] [metrics] Count bytesIn/Out without synchron...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4504 @NicoK The byte counting in the ResultPartition class looks unrelated to the metric system; in fact that field isn't read anywhere. We may want to remove it, but I wouldn't do that as part of this PR. ---
[jira] [Commented] (FLINK-7395) NumBytesOut metric in RecordWriter call synchronized method
[ https://issues.apache.org/jira/browse/FLINK-7395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268656#comment-16268656 ] ASF GitHub Bot commented on FLINK-7395: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4504 @NicoK The byte counting in the ResultPartition class looks unrelated to the metric system; in fact that field isn't read anywhere. We may want to remove it, but I wouldn't do that as part of this PR. > NumBytesOut metric in RecordWriter call synchronized method > --- > > Key: FLINK-7395 > URL: https://issues.apache.org/jira/browse/FLINK-7395 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.3.3 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data
[ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268653#comment-16268653 ] ASF GitHub Bot commented on FLINK-8158: --- GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/5094 [FLINK-8158] [table] Fix rowtime window inner join emits late data bug ## What is the purpose of the change This pull request fixes rowtime window inner join emits late data bug. When executing the join, the join operator needs to make sure that no late data is emitted. However, the window border is not handled correctly. ## Brief change log - Set `WatermarkDelay` to `MaxOutputDelay + 1` instead of `MaxOutputDelay` - Add tests in `JoinHarnessTest` ## Verifying this change This change added tests and can be verified as follows: - *Added tests in `JoinHarnessTest` to check if late data is outputted* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink 8158 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5094.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5094 commit 37d08da40150d26b0c58daf1bc85b69675bf5d64 Author: 军长Date: 2017-11-28T10:58:52Z [FLINK-8158] [table] Fix Rowtime window inner join emits late data bug > Rowtime window inner join emits late data > - > > Key: FLINK-8158 > URL: https://issues.apache.org/jira/browse/FLINK-8158 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng > Attachments: screenshot-1xxx.png > > > When executing the join, the join operator needs to make sure that no late > data is emitted. Currently, this achieved by holding back watermarks. > However, the window border is not handled correctly. For the sql bellow: > {quote} > val sqlQuery = > """ > SELECT t2.key, t2.id, t1.id > FROM T1 as t1 join T2 as t2 ON > t1.key = t2.key AND > t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND > t2.rt + INTERVAL '1' SECOND > """.stripMargin > val data1 = new mutable.MutableList[(String, String, Long)] > // for boundary test > data1.+=(("A", "LEFT1", 6000L)) > val data2 = new mutable.MutableList[(String, String, Long)] > data2.+=(("A", "RIGHT1", 6000L)) > {quote} > Join will output a watermark with timestamp 1000, but if left comes with > another data ("A", "LEFT1", 1000L), join will output a record with timestamp > 1000 which equals previous watermark. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5094: [FLINK-8158] [table] Fix rowtime window inner join...
GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/5094 [FLINK-8158] [table] Fix rowtime window inner join emits late data bug ## What is the purpose of the change This pull request fixes rowtime window inner join emits late data bug. When executing the join, the join operator needs to make sure that no late data is emitted. However, the window border is not handled correctly. ## Brief change log - Set `WatermarkDelay` to `MaxOutputDelay + 1` instead of `MaxOutputDelay` - Add tests in `JoinHarnessTest` ## Verifying this change This change added tests and can be verified as follows: - *Added tests in `JoinHarnessTest` to check if late data is outputted* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink 8158 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5094.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5094 commit 37d08da40150d26b0c58daf1bc85b69675bf5d64 Author: åé¿Date: 2017-11-28T10:58:52Z [FLINK-8158] [table] Fix Rowtime window inner join emits late data bug ---
[jira] [Commented] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
[ https://issues.apache.org/jira/browse/FLINK-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268623#comment-16268623 ] ASF GitHub Bot commented on FLINK-8150: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5093 [FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id ## What is the purpose of the change Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This was wrong since the InstanceID is bound the registration of a TaskExecutor whereas the ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better to identify the TaskExecutor by its ResourceID which does not change. This commit changes the behaviour accordingly on the ResourceManager and the TaskManagerDetailsHandler. ## Brief change log ## Verifying this change - Added `ResourceManagerTest#testRequestTaskManagerInfo` to ensure that we can obtain the `TaskManagerInfo` for registered `TaskExecutors`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixTaskManagerId Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5093.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5093 commit 415e3da9b6bc8dd6071ca98966779f73b682b658 Author: Till RohrmannDate: 2017-11-28T11:43:39Z [FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This was wrong since the InstanceID is bound the registration of a TaskExecutor whereas the ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better to identify the TaskExecutor by its ResourceID which does not change. This commit changes the behaviour accordingly on the ResourceManager and the TaskManagerDetailsHandler. > WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs > - > > Key: FLINK-8150 > URL: https://issues.apache.org/jira/browse/FLINK-8150 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > TaskManager IDs exposed by > {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} > cannot be used as input to query TaskManager metrics with method > {{MetricStore#getTaskManagerMetricStore(String)}}. > *Reason* > {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} > s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} > s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} > returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5093: [FLINK-8150] [flip6] Expose TaskExecutor's Resourc...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5093 [FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id ## What is the purpose of the change Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This was wrong since the InstanceID is bound the registration of a TaskExecutor whereas the ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better to identify the TaskExecutor by its ResourceID which does not change. This commit changes the behaviour accordingly on the ResourceManager and the TaskManagerDetailsHandler. ## Brief change log ## Verifying this change - Added `ResourceManagerTest#testRequestTaskManagerInfo` to ensure that we can obtain the `TaskManagerInfo` for registered `TaskExecutors`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixTaskManagerId Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5093.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5093 commit 415e3da9b6bc8dd6071ca98966779f73b682b658 Author: Till RohrmannDate: 2017-11-28T11:43:39Z [FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This was wrong since the InstanceID is bound the registration of a TaskExecutor whereas the ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better to identify the TaskExecutor by its ResourceID which does not change. This commit changes the behaviour accordingly on the ResourceManager and the TaskManagerDetailsHandler. ---