[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3511 > So I think a simpler and better approach is to just make sure that most types have a good implementation of putNormalizedKey, and then NormalizedKeySorter.compareRecords would be called only rarely, so its performance wouldn't really matter. You are right when the sort keys are simple numeric types, but not with strings, which maybe the most popular choice in some ETL and data warehouse pipelines. But i agree that code generation can't help with this situation, so we investigate some binary data formats to represent our record and modify the interface of TypeSerializer & TypeComparator when doing ser/de. We don't have to consume the input/output view byte by byte, but has the ability to random access the underlying data, aka MemorySegment. It acts like spark's UnsafeRow: https://reviewable.io/reviews/apache/spark/5725, so we can eliminate the most deserialization cost such as `read byte[]` and then `new String(byte[])`. We combine this approach with some code generation to eliminate the virtual function call of the TypeComparator and see a 10x performance improvements with sorting on strings. > I think a large potential in code-generation is to eliminate the overheads of the very many virtual function calls throughout the runtime Totally agreed, after we finish dealing with the code generation and improving the ser/de, we will investigate more about this. Good to see that you have a list of all the megamorphic calls. BTW, we are actually translating the batch jobs into the streaming runtime, i think there will be lots in common. Having and control more type informations, and code generation the whole operator have lots of benefits, it can also help to making most of the calls monomorphic, such as: - fully control of the object reusing, yes - comparators - generating hash codes - potential improvements of some algorithm which finds out they only need to deal with fixed length data - Directly using primitive variables when dealing with simple type And you are right this is orthogonal with runtime improvements, and we see the boundary is the Operator. The framework should provide the most efficient environment for operators to run, and we will code generating the most efficient operators to live in it. > Btw. have you seen this PR for code generation for POJO serializers and comparators? #2211 I didn't see it yet, will find some time to check it out. ---
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186766#comment-16186766 ] ASF GitHub Bot commented on FLINK-6233: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r141993041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r141993041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186631#comment-16186631 ] Bowen Li commented on FLINK-7648: - [~till.rohrmann] I successfully started the cluster in a unit test. But, the browser shows {"errors":["Not found."]} when I visited {{http://127.0.0.1:9067/}}, and there's no UI. Is this expected, or did I do anything wrong? BTW, how to run a job in such a {{StandaloneSessionCluster}}. I'm porting {{JobMetricsHandler}} and it requires a running job to be able to visit that UI component. Sorry for the back-and-forth questionsI'm still learning about the new server architecture > Port TaskManagersHandler to new REST endpoint > - > > Key: FLINK-7648 > URL: https://issues.apache.org/jira/browse/FLINK-7648 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Bowen Li > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{TaskManagersHandler}} to the new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186602#comment-16186602 ] Shuyi Chen commented on FLINK-7491: --- [~fhueske] can you help take another look at the PR? I've addressed your comments. Much appreciated. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7740) Add parameter support in CassandraInputFormat
Bin Wang created FLINK-7740: --- Summary: Add parameter support in CassandraInputFormat Key: FLINK-7740 URL: https://issues.apache.org/jira/browse/FLINK-7740 Project: Flink Issue Type: Improvement Reporter: Bin Wang Priority: Minor I suggest to add a small improvement of CassandraInputFormat. It support CQL string as input only. I think adding parameter support is good for both security and make the CQL string simpler when there is a IN clause or the parameter value is very long. e.g. "SELECT col0, col1, col2 from keyspace.table0 where col0 in ? and col1>?" vs "SELECT col0, col1, col2 from keyspace.table0 where col0 in ('v0', 'v1','v2',...,'vn') and col > 12345678901234" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186260#comment-16186260 ] ASF GitHub Bot commented on FLINK-7406: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141945843 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -235,14 +240,15 @@ void releaseBuffer() { ByteBuf write(ByteBufAllocator allocator) throws IOException { checkNotNull(buffer, "No buffer instance to serialize."); - int length = 16 + 4 + 1 + 4 + buffer.getSize(); + int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize(); --- End diff -- can you please add a comment here explaining reasons of the math? > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141945843 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -235,14 +240,15 @@ void releaseBuffer() { ByteBuf write(ByteBufAllocator allocator) throws IOException { checkNotNull(buffer, "No buffer instance to serialize."); - int length = 16 + 4 + 1 + 4 + buffer.getSize(); + int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize(); --- End diff -- can you please add a comment here explaining reasons of the math? ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141945644 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +136,63 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List requestMemorySegments(int numRequiredBuffers) throws IOException { + checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0."); + + synchronized (factoryLock) { + if (isDestroyed) { + throw new IllegalStateException("Network buffer pool has already been destroyed."); + } + + if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) { + throw new IOException(String.format("Insufficient number of network buffers: " + --- End diff -- How about adding a `FlinkNetworkException` or `FlinkNetworkIOException` for all network/netty related exceptions? That can help users better identify root causes of problems. ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186258#comment-16186258 ] ASF GitHub Bot commented on FLINK-7406: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141945644 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +136,63 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List requestMemorySegments(int numRequiredBuffers) throws IOException { + checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0."); + + synchronized (factoryLock) { + if (isDestroyed) { + throw new IllegalStateException("Network buffer pool has already been destroyed."); + } + + if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) { + throw new IOException(String.format("Insufficient number of network buffers: " + --- End diff -- How about adding a `FlinkNetworkException` or `FlinkNetworkIOException` for all network/netty related exceptions? That can help users better identify root causes of problems. > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186255#comment-16186255 ] ASF GitHub Bot commented on FLINK-5544: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r141942758 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r141942758 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186254#comment-16186254 ] ASF GitHub Bot commented on FLINK-5544: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r141944569 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { +
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186256#comment-16186256 ] ASF GitHub Bot commented on FLINK-5544: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r141944634 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r141944569 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r141944634 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186202#comment-16186202 ] ASF GitHub Bot commented on FLINK-5544: --- Github user EXPjbucher commented on the issue: https://github.com/apache/flink/pull/3359 Hey, I was actually looking into this today and was wondering what the status of this is? We have this exact case where lots of timers are causing high memory use (most of which don't need to be in RAM at the same time). > Implement Internal Timer Service in RocksDB > --- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186083#comment-16186083 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4509 @NicoK , thanks for reviews and talking about this key point. I think I understand your point and agree with that. I will submit the modifications before Monday. > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4509: [FLINK-7406][network] Implement Netty receiver incoming p...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4509 @NicoK , thanks for reviews and talking about this key point. I think I understand your point and agree with that. I will submit the modifications before Monday. ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186049#comment-16186049 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r141911428 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -37,20 +43,31 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Channel handler to read {@link BufferResponse} and {@link ErrorResponse} messages from the + * producer, to write and flush {@link AddCredit} message for the producer. + */ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + /** Channels, which already requested partitions from the producers. */ private final ConcurrentMapinputChannels = new ConcurrentHashMap<>(); + /** Channels, which will notify the producers about unannounced credit. */ + private final ArrayDeque inputChannelsWithCredit = new ArrayDeque<>(); + private final AtomicReference channelError = new AtomicReference<>(); + private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener(); + /** -* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* Set of cancelled partition requests. A request is cancelled if an input channel is cleared --- End diff -- I guess, this "iff" was correct in the meaning of "if and only if" (a common abbreviation in math) - if you like to, you can replace it with the full form though if that is still the logic behind > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r141911428 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -37,20 +43,31 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Channel handler to read {@link BufferResponse} and {@link ErrorResponse} messages from the + * producer, to write and flush {@link AddCredit} message for the producer. + */ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + /** Channels, which already requested partitions from the producers. */ private final ConcurrentMapinputChannels = new ConcurrentHashMap<>(); + /** Channels, which will notify the producers about unannounced credit. */ + private final ArrayDeque inputChannelsWithCredit = new ArrayDeque<>(); + private final AtomicReference channelError = new AtomicReference<>(); + private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener(); + /** -* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* Set of cancelled partition requests. A request is cancelled if an input channel is cleared --- End diff -- I guess, this "iff" was correct in the meaning of "if and only if" (a common abbreviation in math) - if you like to, you can replace it with the full form though if that is still the logic behind ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141906648 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws Exception { verify(inputGate, times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class)); } + /** +* Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request +* floating buffers once receiving the producer's backlog. +*/ + @Test + public void testRequestFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer()); + + final SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getBufferPool()).thenReturn(bufferPool); + + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // Receive the producer's backlog + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10); + // Need to request 10 floating buffers from buffer pool + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8); + // No need to request extra floating buffers from pool because + // there are already 10 available buffers in queue now + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11); + // Need to request another floating buffer from pool + verify(bufferPool, times(11)).requestBuffer(); + } + + /** +* Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request +* floating buffers once receiving the producer's backlog. And it requests from pool only once +* and registers as listener if there are currently no available buffers in the pool. +*/ + @Test + public void testWaitForFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(null); + when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true); --- End diff -- How about using the real `NetworkBufferPool#createBufferPool()` here with a limited set of buffers? Then you could start retrieving buffers as in the test method above and continue with verifying the expected behaviour in case the buffer limit was reached (no need for two test methods, I guess). I'd prefer this over a mock so that you can also verify the interaction with the real methods such as `addBufferListener()`. ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186043#comment-16186043 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141896488 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int sequenceNumber) { } } } + + if (success && backlog > 0) { + onSenderBacklog(backlog); + } + } finally { if (!success) { buffer.recycle(); } } } - public void onEmptyBuffer(int sequenceNumber) { + public void onEmptyBuffer(int sequenceNumber, int backlog) { + boolean success = false; + synchronized (receivedBuffers) { if (!isReleased.get()) { if (expectedSequenceNumber == sequenceNumber) { expectedSequenceNumber++; + success = true; } else { onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); } } } + + if (success && backlog > 0) { --- End diff -- same here > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186040#comment-16186040 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141890051 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** +* Receives the backlog from producer's buffer response. If the number of available +* buffers is less than the backlog length, it will request floating buffers from buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { + Buffer buffer = null; + try { + buffer = inputGate.getBufferPool().requestBuffer(); + } catch (IOException ex) { --- End diff -- This only affects the two methods you mentioned which, themselves, are called by `PartitionRequestClientHandler`/`CreditBasedClientHandler` helper methods allowing `Throwable` - so it's actually not much. I'd prefer not wrapping the exception further here for simplicity in case of errors > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186046#comment-16186046 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141908388 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws Exception { verify(inputGate, times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class)); } + /** +* Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request +* floating buffers once receiving the producer's backlog. +*/ + @Test + public void testRequestFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer()); + + final SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getBufferPool()).thenReturn(bufferPool); + + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // Receive the producer's backlog + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10); + // Need to request 10 floating buffers from buffer pool + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8); + // No need to request extra floating buffers from pool because + // there are already 10 available buffers in queue now + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11); + // Need to request another floating buffer from pool + verify(bufferPool, times(11)).requestBuffer(); + } + + /** +* Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request +* floating buffers once receiving the producer's backlog. And it requests from pool only once +* and registers as listener if there are currently no available buffers in the pool. +*/ + @Test + public void testWaitForFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(null); + when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true); + + final SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getBufferPool()).thenReturn(bufferPool); + when(inputGate.getBufferProvider()).thenReturn(bufferPool); + + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // Receive the producer's backlog + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10); + // Request from pool only once if there are no available floating buffers + verify(bufferPool, times(1)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 10); + // Already registers as listener to wait for notifications and will not request any more + verify(bufferPool, times(1)).requestBuffer(); + } + --- End diff -- actually, some more tests would be nice: - ensuring a fair distribution of buffers to `BufferListener`s - to verify that there is no race condition with two things running in parallel: `onSenderBacklog()` or `notifyBufferAvailable()` requesting buffers and some other thread recycling them (floating and/or exclusive ones). > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186038#comment-16186038 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141901569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -209,6 +276,95 @@ public String toString() { } // + // Credit-based + // + + /** +* Enqueue this input channel in the pipeline for sending unannounced credits to producer. +*/ + void notifyCreditAvailable() { + //TODO in next PR + } + + /** +* Exclusive buffer is recycled to this input channel directly and it may trigger notify +* credit to producer. +* +* @param segment The exclusive segment of this channel. +*/ + @Override + public void recycle(MemorySegment segment) { + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + // that way the segment can also be returned to global pool after added into + // the available queue during releasing all resources. + if (isReleased.get()) { + try { + inputGate.returnExclusiveSegments(Arrays.asList(segment)); + return; + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + } + availableBuffers.add(new Buffer(segment, this)); + } + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } + } + + public int getNumberOfAvailableBuffers() { + synchronized (availableBuffers) { + return availableBuffers.size(); + } + } + + /** +* The Buffer pool notifies this channel of an available floating buffer. If the channel is released or +* currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, +* the buffer will be added into the availableBuffers queue and the unannounced credit is +* increased by one. +* +* @param buffer Buffer that becomes available in buffer pool. +* @return True when this channel is waiting for more floating buffers, otherwise false. +*/ + @Override + public boolean notifyBufferAvailable(Buffer buffer) { + checkState(isWaitingForFloatingBuffers.get(), "This channel should be waiting for floating buffers."); + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (isReleased.get() || availableBuffers.size() >= senderBacklog.get()) { + isWaitingForFloatingBuffers.set(false); + buffer.recycle(); + + return false; + } + + availableBuffers.add(buffer); + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } --- End diff -- can we do this outside the `synchronized` block? > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}}
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186045#comment-16186045 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141896356 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int sequenceNumber) { } } } + + if (success && backlog > 0) { --- End diff -- Shouldn't we call this for `success && backlog >= 0` to try to always have `initialCredit` (extra) buffers available (if there are enough buffers)? > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141901569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -209,6 +276,95 @@ public String toString() { } // + // Credit-based + // + + /** +* Enqueue this input channel in the pipeline for sending unannounced credits to producer. +*/ + void notifyCreditAvailable() { + //TODO in next PR + } + + /** +* Exclusive buffer is recycled to this input channel directly and it may trigger notify +* credit to producer. +* +* @param segment The exclusive segment of this channel. +*/ + @Override + public void recycle(MemorySegment segment) { + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + // that way the segment can also be returned to global pool after added into + // the available queue during releasing all resources. + if (isReleased.get()) { + try { + inputGate.returnExclusiveSegments(Arrays.asList(segment)); + return; + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + } + availableBuffers.add(new Buffer(segment, this)); + } + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } + } + + public int getNumberOfAvailableBuffers() { + synchronized (availableBuffers) { + return availableBuffers.size(); + } + } + + /** +* The Buffer pool notifies this channel of an available floating buffer. If the channel is released or +* currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, +* the buffer will be added into the availableBuffers queue and the unannounced credit is +* increased by one. +* +* @param buffer Buffer that becomes available in buffer pool. +* @return True when this channel is waiting for more floating buffers, otherwise false. +*/ + @Override + public boolean notifyBufferAvailable(Buffer buffer) { + checkState(isWaitingForFloatingBuffers.get(), "This channel should be waiting for floating buffers."); + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (isReleased.get() || availableBuffers.size() >= senderBacklog.get()) { + isWaitingForFloatingBuffers.set(false); + buffer.recycle(); + + return false; + } + + availableBuffers.add(buffer); + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } --- End diff -- can we do this outside the `synchronized` block? ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141886467 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** +* Receives the backlog from producer's buffer response. If the number of available +* buffers is less than the backlog length, it will request floating buffers from buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { --- End diff -- I was thinking about it a bit more and was talking to @StephanEwen about it, and we think, that it is actually fine to grab all resources we need at the moment. If there are not enough buffers at some point, the fair distribution will start when the buffers are recycled, i.e. via the callbacks of the new `BufferListener`. Since each channel always has its own exclusive buffers, we can guarantee that it always makes progress anyway! Additionally, we cannot really make a fair distribution from the start when receiving the first backlog (since we do not know all the other backlogs) unless we're waiting some time which we also do not want. I kind of like your idea of having a `numBuffersPerAllocation`. Let's keep this in our heads and evaluate the current solution first to see whether we need this addition. Regarding the formula (which I took from the network FLIP): - from the FLIP with regards to the buffer distribution strategy: `Design rationale 2: Each channel always tries to maintain a credit of ‘backlog_length + initialCredit’. That means that each channel tries to build the receive window for its current backlog as much as possible from the floating buffers, and use the exclusive ‘initialCredit’ buffers as a means to grow the window.` That way we always have some buffers available immediately on the receiver side so the sender can continue sending new buffers immediately (as long as there are buffers available on the receiver) and we do not have to wait for the exclusive buffers to come back. - Note that this would have to be changed in the various checks for `availableBuffers.size() >= senderBacklog.get()`, e.g. in `RemoteInputChannel#notifyBufferAvailable()`. - Similarly, `RemoteInputChannel#recycle()` needs to be adapted, in case our exclusive buffers are in use and we requested `backlog_length + initialCredit - currentCredit` *floating* buffers in order not to stack up `2*initialCredit` buffers once `backlog == 0` again. (+ an according unit test) - what do you mean with `backlog-currentCredit` not being very accurate? We guarantee that there are no more than `currentCredit` buffers on the wire (some alraedy in the channel, some only announced) and, at the time the buffer was sent, `backlog` additional buffers were queued so in order to send them, we always need `backlog-currentCredit` irrespective of how much credit is announced vs. being on the wire. Or am I not seeing something here? ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141865540 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. --- End diff -- ...unless the channel has been released ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141890624 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** +* Receives the backlog from the producer's buffer response. If the number of available +* buffers is less than the backlog length, it will request floating buffers from the buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { + Buffer buffer = null; + try { + buffer = inputGate.getBufferPool().requestBuffer(); + } catch (IOException ex) { + ExceptionUtils.rethrow(ex); + } + + if (buffer != null) { + availableBuffers.add(buffer); + numRequestedBuffers++; + continue; + } + + // If the channel has not got enough buffers, register it as listener to wait for more floating buffers. + if (inputGate.getBufferProvider().addBufferListener(this)) { + if (!isWaitingForFloatingBuffers.compareAndSet(false, true)) { + throw new IllegalStateException("This channel should not be waiting for floating buffers."); + } --- End diff -- ...instead, here you can actively `break` out of the loop early without having to check the conditions again ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141896356 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int sequenceNumber) { } } } + + if (success && backlog > 0) { --- End diff -- Shouldn't we call this for `success && backlog >= 0` to try to always have `initialCredit` (extra) buffers available (if there are enough buffers)? ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186035#comment-16186035 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141890624 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** +* Receives the backlog from the producer's buffer response. If the number of available +* buffers is less than the backlog length, it will request floating buffers from the buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { + Buffer buffer = null; + try { + buffer = inputGate.getBufferPool().requestBuffer(); + } catch (IOException ex) { + ExceptionUtils.rethrow(ex); + } + + if (buffer != null) { + availableBuffers.add(buffer); + numRequestedBuffers++; + continue; + } + + // If the channel has not got enough buffers, register it as listener to wait for more floating buffers. + if (inputGate.getBufferProvider().addBufferListener(this)) { + if (!isWaitingForFloatingBuffers.compareAndSet(false, true)) { + throw new IllegalStateException("This channel should not be waiting for floating buffers."); + } --- End diff -- ...instead, here you can actively `break` out of the loop early without having to check the conditions again > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186037#comment-16186037 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141865540 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. --- End diff -- ...unless the channel has been released > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186044#comment-16186044 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141865630 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); --- End diff -- can you mark this method `@Nullable`? > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186041#comment-16186041 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141902956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -70,6 +79,21 @@ */ private int expectedSequenceNumber = 0; + /** The initial number of exclusive buffers assigned to this channel. */ + private int initialCredit; + + /** The current available buffers including both exclusive buffers and requested floating buffers. */ + private final ArrayDeque availableBuffers = new ArrayDeque<>(); + + /** The number of available buffers that have not been announced to the producer yet. */ + private final AtomicInteger unannouncedCredit = new AtomicInteger(0); + + /** The number of unsent buffers in the producer's sub partition. */ + private final AtomicInteger senderBacklog = new AtomicInteger(0); + + /** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */ + private final AtomicBoolean isWaitingForFloatingBuffers = new AtomicBoolean(false); --- End diff -- Now seeing this in action: do we really need a `AtomicBoolean`? Or is a `volatile boolean` enough? All uses except for `notifyBufferDestroyed()` (where only a safety-check uses the value) are actually under a `synchronized (availableBuffers)` block...in this case, you may also annotate the variable as `@GuardedBy("availableBuffers")` for documentation. > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186039#comment-16186039 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141886467 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** +* Receives the backlog from producer's buffer response. If the number of available +* buffers is less than the backlog length, it will request floating buffers from buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { --- End diff -- I was thinking about it a bit more and was talking to @StephanEwen about it, and we think, that it is actually fine to grab all resources we need at the moment. If there are not enough buffers at some point, the fair distribution will start when the buffers are recycled, i.e. via the callbacks of the new `BufferListener`. Since each channel always has its own exclusive buffers, we can guarantee that it always makes progress anyway! Additionally, we cannot really make a fair distribution from the start when receiving the first backlog (since we do not know all the other backlogs) unless we're waiting some time which we also do not want. I kind of like your idea of having a `numBuffersPerAllocation`. Let's keep this in our heads and evaluate the current solution first to see whether we need this addition. Regarding the formula (which I took from the network FLIP): - from the FLIP with regards to the buffer distribution strategy: `Design rationale 2: Each channel always tries to maintain a credit of ‘backlog_length + initialCredit’. That means that each channel tries to build the receive window for its current backlog as much as possible from the floating buffers, and use the exclusive ‘initialCredit’ buffers as a means to grow the window.` That way we always have some buffers available immediately on the receiver side so the sender can continue sending new buffers immediately (as long as there are buffers available on the receiver) and we do not have to wait for the exclusive buffers to come back. - Note that this would have to be changed in the various checks for `availableBuffers.size() >= senderBacklog.get()`, e.g. in `RemoteInputChannel#notifyBufferAvailable()`. - Similarly, `RemoteInputChannel#recycle()` needs to be adapted, in case our exclusive buffers are in use and we requested `backlog_length + initialCredit - currentCredit` *floating* buffers in order not to stack up `2*initialCredit` buffers once `backlog == 0` again. (+ an according unit test) - what do you mean with `backlog-currentCredit` not being very accurate? We guarantee that there are no more than `currentCredit` buffers on the wire (some alraedy in the channel, some only announced) and, at the time the buffer was sent, `backlog` additional buffers were queued so in order to send them, we always need `backlog-currentCredit` irrespective of how much credit is announced vs. being on the wire. Or am I not seeing something here? > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186042#comment-16186042 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141906648 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws Exception { verify(inputGate, times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class)); } + /** +* Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request +* floating buffers once receiving the producer's backlog. +*/ + @Test + public void testRequestFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer()); + + final SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getBufferPool()).thenReturn(bufferPool); + + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // Receive the producer's backlog + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10); + // Need to request 10 floating buffers from buffer pool + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8); + // No need to request extra floating buffers from pool because + // there are already 10 available buffers in queue now + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11); + // Need to request another floating buffer from pool + verify(bufferPool, times(11)).requestBuffer(); + } + + /** +* Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request +* floating buffers once receiving the producer's backlog. And it requests from pool only once +* and registers as listener if there are currently no available buffers in the pool. +*/ + @Test + public void testWaitForFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(null); + when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true); --- End diff -- How about using the real `NetworkBufferPool#createBufferPool()` here with a limited set of buffers? Then you could start retrieving buffers as in the test method above and continue with verifying the expected behaviour in case the buffer limit was reached (no need for two test methods, I guess). I'd prefer this over a mock so that you can also verify the interaction with the real methods such as `addBufferListener()`. > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186036#comment-16186036 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141890343 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** +* Receives the backlog from the producer's buffer response. If the number of available +* buffers is less than the backlog length, it will request floating buffers from the buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { + Buffer buffer = null; + try { + buffer = inputGate.getBufferPool().requestBuffer(); + } catch (IOException ex) { + ExceptionUtils.rethrow(ex); + } + + if (buffer != null) { + availableBuffers.add(buffer); + numRequestedBuffers++; + continue; --- End diff -- How about using the rest in an `else` statement, instead of `continue`ing here? This would be more intuitive, I guess. > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141902956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -70,6 +79,21 @@ */ private int expectedSequenceNumber = 0; + /** The initial number of exclusive buffers assigned to this channel. */ + private int initialCredit; + + /** The current available buffers including both exclusive buffers and requested floating buffers. */ + private final ArrayDeque availableBuffers = new ArrayDeque<>(); + + /** The number of available buffers that have not been announced to the producer yet. */ + private final AtomicInteger unannouncedCredit = new AtomicInteger(0); + + /** The number of unsent buffers in the producer's sub partition. */ + private final AtomicInteger senderBacklog = new AtomicInteger(0); + + /** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */ + private final AtomicBoolean isWaitingForFloatingBuffers = new AtomicBoolean(false); --- End diff -- Now seeing this in action: do we really need a `AtomicBoolean`? Or is a `volatile boolean` enough? All uses except for `notifyBufferDestroyed()` (where only a safety-check uses the value) are actually under a `synchronized (availableBuffers)` block...in this case, you may also annotate the variable as `@GuardedBy("availableBuffers")` for documentation. ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141865630 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); --- End diff -- can you mark this method `@Nullable`? ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141908388 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws Exception { verify(inputGate, times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class)); } + /** +* Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request +* floating buffers once receiving the producer's backlog. +*/ + @Test + public void testRequestFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer()); + + final SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getBufferPool()).thenReturn(bufferPool); + + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // Receive the producer's backlog + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10); + // Need to request 10 floating buffers from buffer pool + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8); + // No need to request extra floating buffers from pool because + // there are already 10 available buffers in queue now + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11); + // Need to request another floating buffer from pool + verify(bufferPool, times(11)).requestBuffer(); + } + + /** +* Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request +* floating buffers once receiving the producer's backlog. And it requests from pool only once +* and registers as listener if there are currently no available buffers in the pool. +*/ + @Test + public void testWaitForFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(null); + when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true); + + final SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getBufferPool()).thenReturn(bufferPool); + when(inputGate.getBufferProvider()).thenReturn(bufferPool); + + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // Receive the producer's backlog + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10); + // Request from pool only once if there are no available floating buffers + verify(bufferPool, times(1)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 10); + // Already registers as listener to wait for notifications and will not request any more + verify(bufferPool, times(1)).requestBuffer(); + } + --- End diff -- actually, some more tests would be nice: - ensuring a fair distribution of buffers to `BufferListener`s - to verify that there is no race condition with two things running in parallel: `onSenderBacklog()` or `notifyBufferAvailable()` requesting buffers and some other thread recycling them (floating and/or exclusive ones). ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141890343 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** +* Receives the backlog from the producer's buffer response. If the number of available +* buffers is less than the backlog length, it will request floating buffers from the buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { + Buffer buffer = null; + try { + buffer = inputGate.getBufferPool().requestBuffer(); + } catch (IOException ex) { + ExceptionUtils.rethrow(ex); + } + + if (buffer != null) { + availableBuffers.add(buffer); + numRequestedBuffers++; + continue; --- End diff -- How about using the rest in an `else` statement, instead of `continue`ing here? This would be more intuitive, I guess. ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141896488 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int sequenceNumber) { } } } + + if (success && backlog > 0) { + onSenderBacklog(backlog); + } + } finally { if (!success) { buffer.recycle(); } } } - public void onEmptyBuffer(int sequenceNumber) { + public void onEmptyBuffer(int sequenceNumber, int backlog) { + boolean success = false; + synchronized (receivedBuffers) { if (!isReleased.get()) { if (expectedSequenceNumber == sequenceNumber) { expectedSequenceNumber++; + success = true; } else { onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); } } } + + if (success && backlog > 0) { --- End diff -- same here ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141890051 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode. +* +* @return The available buffer. +*/ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** +* Receives the backlog from producer's buffer response. If the number of available +* buffers is less than the backlog length, it will request floating buffers from buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { + Buffer buffer = null; + try { + buffer = inputGate.getBufferPool().requestBuffer(); + } catch (IOException ex) { --- End diff -- This only affects the two methods you mentioned which, themselves, are called by `PartitionRequestClientHandler`/`CreditBasedClientHandler` helper methods allowing `Throwable` - so it's actually not much. I'd prefer not wrapping the exception further here for simplicity in case of errors ---
[jira] [Commented] (FLINK-7735) Improve date/time handling in publically-facing Expressions
[ https://issues.apache.org/jira/browse/FLINK-7735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186007#comment-16186007 ] Alexey Diomin commented on FLINK-7735: -- Java 7 will be dropped very soon FLINK-7242 Maybe make sense discus in java 8 time API? > Improve date/time handling in publically-facing Expressions > --- > > Key: FLINK-7735 > URL: https://issues.apache.org/jira/browse/FLINK-7735 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Reporter: Kent Murra >Priority: Minor > > I would like to discuss potential improvements to date/time/timestamp > handling in Expressions. Since Flink now offers expression push down for > table sources, which includes time-related functions, timezone handling is > more visible to the end user. > I think that the current usage of java.sql.Time, java.sql.Date, and > java.sql.Timestamp are fairly ambiguous. We're taking a Date subclass in the > constructor of Literal, and assuming that the year, month, day, and hour > fields apply to UTC rather than the user's default timezone. Per that > assumption, Flink is [adjusting the value of the epoch > timestamp|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106] > silently when converting to the RexLiteral. This provides correct behavior > if the user assumes that the year/month/day/hour fields in the Date object > are the same timezone that the SQL statement assumes (which is UTC). > However, if they work at all with the epoch timestamp (which is a public > field) this can lead to incorrect results. Moreover, its confusing if you're > considering the time zones your data is in, requiring some amount of research > to determine correct behavior. > It would be ideal to: > # Provide primitives that have time-zone information associated by default, > thus disambiguating the times. > # Properly document all TimeZone related assumptions in Expression literals. > # Document that the TIMESTAMP calcite function will assume that the timestamp > is in UTC in web documentation. > # Having a timezone based date parsing function in the SQL language. > Regarding the primitives, since we have to support Java 7, we can't use Java > 8 time API. I'm guessing it'd be a decision between using Joda Time or > making thin data structures that could easily be converted to various other > time primitives. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7700) State merging in RocksDB backend leaves old state
[ https://issues.apache.org/jira/browse/FLINK-7700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185950#comment-16185950 ] ASF GitHub Bot commented on FLINK-7700: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4752 [FLINK-7700] Fix RocksDB state merging on release-1.3 This is only for running the tests, which I can't do on my own Travis since tests time out. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink release-1.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4752.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4752 commit 64858a7881d22fb665099e1b2cd8363e447ed572 Author: Aljoscha KrettekDate: 2016-11-04T09:17:55Z [FLINK-5619] Add numStateEntries() method for all keyed backends This also adds a test for this in StateBackendTestBase commit 31cd6db6a7d2f7d78aa05f6ff5fc82abbc79d042 Author: Aljoscha Krettek Date: 2017-09-27T09:37:09Z [FLINK-7700] Fix RocksDB ListState merging Before, the merged state was not cleared. commit 5e3d49cc05591bf396bfcf9e578f8e30ce436114 Author: Aljoscha Krettek Date: 2017-09-27T09:38:13Z [FLINK-5619] Consolidate ListState Tests in StateBackendTestBase commit e0bd8c060da01e5defe528964bf638868cbc13d5 Author: Aljoscha Krettek Date: 2017-09-27T10:53:08Z [FLINK-7700] Fix RocksDB ReducingState merging Before, the merged state was not cleared. commit 0ffca863a4b189675d2f15ee61f9d20fe49e85f0 Author: Aljoscha Krettek Date: 2017-09-27T10:54:19Z [FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase commit b0a7c091856aa2a99715a9fddcc801f660414cfd Author: Aljoscha Krettek Date: 2017-09-27T11:01:55Z [FLINK-7700] Fix RocksDB AggregatingState merging Before, the merged state was not cleared. commit 835de46b5f63ffeee8e89a89d3f2e6a9af06efc4 Author: Aljoscha Krettek Date: 2017-09-27T11:02:57Z [FLINK-5619] Consolidate AggregatingState Tests in StateBackendTestBase > State merging in RocksDB backend leaves old state > - > > Key: FLINK-7700 > URL: https://issues.apache.org/jira/browse/FLINK-7700 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > {{RocksDBAggregatingState.mergeNamespaces(...)}}, > {{RocksDBReducingState.mergeNamespaces(...)}}, and > {{RocksDBListState.mergeNamespaces(...)}} don't remove state from the old > location when merging. This means that merged state will never be cleaned up > and will accumulate. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user ggevay commented on the issue: https://github.com/apache/flink/pull/3511 > IMHO, in addition to these changes, there are still some potential improvements we can do about the sorter, like deserialization when comparing the real records. Do you mean `NormalizedKeySorter.compareRecords`? This calls `comparator.compareSerialized`, which is currently implemented for most types by simply deserializing the records and then comparing the deserialized form. It would maybe bring some performance improvement if this had a real implementation, which would deserialize only the key fields (which are relevant to the comparison). The hard part here is how to handle nullable fields, which make the offset of individual fields unpredictable. So I think a simpler and better approach is to just make sure that most types have a good implementation of `putNormalizedKey`, and then `NormalizedKeySorter.compareRecords` would be called only rarely, so its performance wouldn't really matter. > We are working on a project to fully code generate the algorithm Flink runtime used, and borrowed lots of idea of this PR, thanks! I'm happy to hear that! Btw. I think a large potential in code-generation is to eliminate the overheads of the very many virtual function calls throughout the runtime, by making the calls monomorphic [1,2]. For example, there could be a custom `MapDriver` generated for each map UDF, and then this custom class would always call the same UDF, making the call monomorphic, and thus easily optimizable by the JVM [1,2]. (I think @greghogan was also thinking about this.) For example, the following virtual calls are on the per-record code path: - drivers (such as `MapDriver`) calling `MutableObjectIterator.next` - drivers calling the UDF - drivers calling `output.collect` - `ReusingDeserializationDelegate` or `NonReusingDeserializationDelegate` calling `serializer.deserialize` - `SpillingResettableMutableObjectIterator` calling `serializer.serialize` and `serializer.deserialize` - `PojoSerializer`, `TupleSerializer`, `RowSerializer`, etc. calling their field serializers - `CountingCollector` calling `collector.collect` - `RecordWriter` calling `channelSelector.selectChannels` - `SerializationDelegate` calling `serializer.serialize` - `StreamElementSerializer` calling methods on its `typeSerializer` - `OutputEmitter` calling `comparator.hash` - `ComparableKeySelector` calling `comparator.extractKeys` - `assignToKeyGroup` calling `key.hashCode` I think most of the above calls are megamorphic (especially in larger Flink jobs with many operators), which makes them slow [1,2]. They could be made monomorphic by code-generating custom versions of these classes, where the customization would be to fix the type of the targets of these calls. (I think this could probably be done independently of the Table API.) Another potential for code-generation is customizing `OutputEmitter` for the number of channels: there are places where a modulo operation is performed, which is much faster if the divisor is a compile-time constant, since then the compiler uses such tricks as described in this paper: https://gmplib.org/~tege/divcnst-pldi94.pdf And the same optimization could be made in `KeyGroupRangeAssignment`, where there are divisions by `maxParallelism`. > we need more type information control and flexible code generating supports, so we choose to do it through the Table API & SQL. How do you see this approach? Having more info about the types, UDFs, etc. of your program certainly can help. Unfortunately I don't know too much about the Table API & SQL, but I have a few random thoughts: - Since you are generating the UDFs, it should be possible to make them have good object reuse behavior, without the users having to worry about the non-trivial object reuse rules. - For basic types (int, boolean, etc.) the generated code could use `IntValue`, `BooleanValue`, etc., facilitating more object reuse, again without making the users write ugly boilerplate (as they have to do in the DataSet/DataStream API when they want to use these classes). - Same thing with `StringValue` Btw. have you seen this PR for code generation for POJO serializers and comparators? https://github.com/apache/flink/pull/2211 It has some issues, so it is not as close to merging as this PR, but maybe I'll try to tie that up as well some time. [1] http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ [2] https://shipilev.net/blog/2015/black-magic-method-dispatch/ ---
[GitHub] flink pull request #4752: [FLINK-7700] Fix RocksDB state merging on release-...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4752 [FLINK-7700] Fix RocksDB state merging on release-1.3 This is only for running the tests, which I can't do on my own Travis since tests time out. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink release-1.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4752.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4752 commit 64858a7881d22fb665099e1b2cd8363e447ed572 Author: Aljoscha KrettekDate: 2016-11-04T09:17:55Z [FLINK-5619] Add numStateEntries() method for all keyed backends This also adds a test for this in StateBackendTestBase commit 31cd6db6a7d2f7d78aa05f6ff5fc82abbc79d042 Author: Aljoscha Krettek Date: 2017-09-27T09:37:09Z [FLINK-7700] Fix RocksDB ListState merging Before, the merged state was not cleared. commit 5e3d49cc05591bf396bfcf9e578f8e30ce436114 Author: Aljoscha Krettek Date: 2017-09-27T09:38:13Z [FLINK-5619] Consolidate ListState Tests in StateBackendTestBase commit e0bd8c060da01e5defe528964bf638868cbc13d5 Author: Aljoscha Krettek Date: 2017-09-27T10:53:08Z [FLINK-7700] Fix RocksDB ReducingState merging Before, the merged state was not cleared. commit 0ffca863a4b189675d2f15ee61f9d20fe49e85f0 Author: Aljoscha Krettek Date: 2017-09-27T10:54:19Z [FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase commit b0a7c091856aa2a99715a9fddcc801f660414cfd Author: Aljoscha Krettek Date: 2017-09-27T11:01:55Z [FLINK-7700] Fix RocksDB AggregatingState merging Before, the merged state was not cleared. commit 835de46b5f63ffeee8e89a89d3f2e6a9af06efc4 Author: Aljoscha Krettek Date: 2017-09-27T11:02:57Z [FLINK-5619] Consolidate AggregatingState Tests in StateBackendTestBase ---
[jira] [Commented] (FLINK-7736) Fix some of the alerts raised by lgtm.com
[ https://issues.apache.org/jira/browse/FLINK-7736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185921#comment-16185921 ] Malcolm Taylor commented on FLINK-7736: --- Identified 14 alerts to address: 1) dereferenced variable is always null, in TaskSlotTable 2-3) array index out of bounds, in KVStateRequestSerializer and Utils 4) inconsistent equals and hashCode, in ArchivedJson 5-6) close input, in JarListHandler and SocketTextStreamFunction 7) close output, in JarFileCreator 8) unused format argument, in YarnApplicationMasterRunner 9) useless type test, in GroupReduceNode 10-11) useless comparison, in TaskExecutor and FieldAccessor 12-14) Result of integer multiplication cast to long, in MemoryManager and twice in InPlaceMutableHashTable > Fix some of the alerts raised by lgtm.com > - > > Key: FLINK-7736 > URL: https://issues.apache.org/jira/browse/FLINK-7736 > Project: Flink > Issue Type: Improvement >Reporter: Malcolm Taylor >Assignee: Malcolm Taylor > > lgtm.com has identified a number of issues giving scope for improvement in > the code: [https://lgtm.com/projects/g/apache/flink/alerts/?mode=list] > This issue is to address some of the simpler ones. Some of these are quite > clear bugs such as off-by-one errors. Others are areas where the code might > be made clearer, such as use of a variable name which shadows another > variable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4750: [FLINK-7710] [flip6] Add CheckpointStatisticsHandl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4750#discussion_r141882415 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java --- @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +/** + * Response of the {@link CheckpointStatisticsHandler}. + */ +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_COUNTS = "counts"; + + public static final String FIELD_NAME_SUMMARY = "summary"; + + public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest"; + + public static final String FIELD_NAME_HISTORY = "history"; + + @JsonProperty(FIELD_NAME_COUNTS) + private final Counts counts; + + @JsonProperty(FIELD_NAME_SUMMARY) + private final Summary summary; + + @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) + private final LatestCheckpoints latestCheckpoints; + + @JsonProperty(FIELD_NAME_HISTORY) + private final List history; + + @JsonCreator + public CheckpointStatistics( + @JsonProperty(FIELD_NAME_COUNTS) Counts counts, + @JsonProperty(FIELD_NAME_SUMMARY) Summary summary, + @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints, + @JsonProperty(FIELD_NAME_HISTORY) List history) { + this.counts = Preconditions.checkNotNull(counts); + this.summary = Preconditions.checkNotNull(summary); + this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints); + this.history = Preconditions.checkNotNull(history); + } + + public Counts getCounts() { + return counts; + } + + public Summary getSummary() { + return summary; + } + + public LatestCheckpoints getLatestCheckpoints() { + return latestCheckpoints; + } + + public List getHistory() { + return history; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointStatistics that = (CheckpointStatistics) o; + return Objects.equals(counts, that.counts) && + Objects.equals(summary, that.summary) && + Objects.equals(latestCheckpoints, that.latestCheckpoints) && + Objects.equals(history, that.history); + } + + @Override + public int hashCode() { + return Objects.hash(counts, summary, latestCheckpoints, history); + } + + // -- + // Inner classes + // -- + + /** +* Checkpoint counts. +*/ + public static final class Counts { + + public static final String FIELD_NAME_RESTORED_CHECKPOINTS = "restored"; + + public static final String FIELD_NAME_TOTAL_CHECKPOINTS = "total"; + + public static final String
[jira] [Commented] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185911#comment-16185911 ] ASF GitHub Bot commented on FLINK-7710: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4750#discussion_r141882415 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java --- @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +/** + * Response of the {@link CheckpointStatisticsHandler}. + */ +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_COUNTS = "counts"; + + public static final String FIELD_NAME_SUMMARY = "summary"; + + public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest"; + + public static final String FIELD_NAME_HISTORY = "history"; + + @JsonProperty(FIELD_NAME_COUNTS) + private final Counts counts; + + @JsonProperty(FIELD_NAME_SUMMARY) + private final Summary summary; + + @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) + private final LatestCheckpoints latestCheckpoints; + + @JsonProperty(FIELD_NAME_HISTORY) + private final List history; + + @JsonCreator + public CheckpointStatistics( + @JsonProperty(FIELD_NAME_COUNTS) Counts counts, + @JsonProperty(FIELD_NAME_SUMMARY) Summary summary, + @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints, + @JsonProperty(FIELD_NAME_HISTORY) List history) { + this.counts = Preconditions.checkNotNull(counts); + this.summary = Preconditions.checkNotNull(summary); + this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints); + this.history = Preconditions.checkNotNull(history); + } + + public Counts getCounts() { + return counts; + } + + public Summary getSummary() { + return summary; + } + + public LatestCheckpoints getLatestCheckpoints() { + return latestCheckpoints; + } + + public List getHistory() { + return history; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointStatistics that = (CheckpointStatistics) o; + return Objects.equals(counts, that.counts) && + Objects.equals(summary, that.summary) && + Objects.equals(latestCheckpoints, that.latestCheckpoints) && + Objects.equals(history, that.history); + } + + @Override + public int hashCode() { + return Objects.hash(counts, summary, latestCheckpoints, history); + } + + // -- + // Inner classes + // -- + + /** +* Checkpoint counts. +*/ + public static final class Counts { + +
[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185839#comment-16185839 ] ASF GitHub Bot commented on FLINK-7739: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4751 [FLINK-7739][kafka-tests] Throttle down data producing thread Minor tests improvement in tests to avoid busy loop You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink custom-partitioning Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4751.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4751 commit 77cefb87214a4b7462c5b65fafe86352fe3a7047 Author: Piotr NowojskiDate: 2017-09-28T14:59:09Z [FLINK-7739][kafka-tests] Throttle down data producing thread > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4751: [FLINK-7739][kafka-tests] Throttle down data produ...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4751 [FLINK-7739][kafka-tests] Throttle down data producing thread Minor tests improvement in tests to avoid busy loop You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink custom-partitioning Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4751.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4751 commit 77cefb87214a4b7462c5b65fafe86352fe3a7047 Author: Piotr NowojskiDate: 2017-09-28T14:59:09Z [FLINK-7739][kafka-tests] Throttle down data producing thread ---
[GitHub] flink pull request #4750: [FLINK-7710] [flip6] Add CheckpointStatisticsHandl...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4750 [FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint ## What is the purpose of the change This commit also makes the CheckpointStatsHistory object serializable by removing the CheckpointStatsHistoryIterable and replacing it with a static ArrayList. ## Verifying this change *(Please pick either of the following options)* This change added tests and can be verified as follows: - `CheckpointStatisticsTest` which tests the marshalling of `CheckpointStatistics` instances ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink portCheckpointStatsHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4750.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4750 commit d6be27aac5d7ff022dc03874a8bb52b1c09d49de Author: Till RohrmannDate: 2017-09-25T13:29:59Z [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after a given time to live period. This will trigger requesting the AccessExecutionGraph again and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST handlers. In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which have exceeded their time to live. Currently it is set to 20 * refreshInterval of the web gui. This closes #4728. commit 29f5d7e2484d52889230c559031cb7c427173927 Author: Till Rohrmann Date: 2017-09-26T16:39:15Z [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint commit e77fb7a5d78be540116051b9914cd70e61cf5df7 Author: Till Rohrmann Date: 2017-09-28T16:35:50Z [FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint This commit implements the CheckpointConfigHandler which now returns a CheckpointConfigInfo object if checkpointing is enabled. In case that checkpointing is disabled for a job, it will return a 404 response. commit cd32570761c129da0bfff1fe0a4b4af6ae193c9c Author: Till Rohrmann Date: 2017-09-29T13:09:06Z [FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint This commit also makes the CheckpointStatsHistory object serializable by removing the CheckpointStatsHistoryIterable and replacing it with a static ArrayList. ---
[jira] [Commented] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185821#comment-16185821 ] ASF GitHub Bot commented on FLINK-7710: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4750 [FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint ## What is the purpose of the change This commit also makes the CheckpointStatsHistory object serializable by removing the CheckpointStatsHistoryIterable and replacing it with a static ArrayList. ## Verifying this change *(Please pick either of the following options)* This change added tests and can be verified as follows: - `CheckpointStatisticsTest` which tests the marshalling of `CheckpointStatistics` instances ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink portCheckpointStatsHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4750.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4750 commit d6be27aac5d7ff022dc03874a8bb52b1c09d49de Author: Till RohrmannDate: 2017-09-25T13:29:59Z [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after a given time to live period. This will trigger requesting the AccessExecutionGraph again and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST handlers. In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which have exceeded their time to live. Currently it is set to 20 * refreshInterval of the web gui. This closes #4728. commit 29f5d7e2484d52889230c559031cb7c427173927 Author: Till Rohrmann Date: 2017-09-26T16:39:15Z [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint commit e77fb7a5d78be540116051b9914cd70e61cf5df7 Author: Till Rohrmann Date: 2017-09-28T16:35:50Z [FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint This commit implements the CheckpointConfigHandler which now returns a CheckpointConfigInfo object if checkpointing is enabled. In case that checkpointing is disabled for a job, it will return a 404 response. commit cd32570761c129da0bfff1fe0a4b4af6ae193c9c Author: Till Rohrmann Date: 2017-09-29T13:09:06Z [FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint This commit also makes the CheckpointStatsHistory object serializable by removing the CheckpointStatsHistoryIterable and replacing it with a static ArrayList. > Port CheckpointStatsHandler to new REST endpoint > > > Key: FLINK-7710 > URL: https://issues.apache.org/jira/browse/FLINK-7710 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185816#comment-16185816 ] ASF GitHub Bot commented on FLINK-7072: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4730 > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4730: [hotfix] [REST] Various rest-related hotfixes
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4730 ---
[GitHub] flink issue #4730: [hotfix] [REST] Various rest-related hotfixes
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4730 Changes look good to me. Thanks a lot for your contribution @zentol. Merging this PR. ---
[GitHub] flink pull request #4749: [FLINK-7739][tests] Properly shutdown resources in...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4749 [FLINK-7739][tests] Properly shutdown resources in tests This is a fixup of tests, without touching the production code. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink kafka-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4749.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4749 commit 4e55087d091993a9552b5f732abab8a5c0a5a014 Author: Piotr NowojskiDate: 2017-09-29T12:45:28Z [FLINK-7739][kafka-tests] Shutdown NetworkFailureProxy commit a5bbc6fdf2a3e039739a0392dad3d61791f0c3fd Author: Piotr Nowojski Date: 2017-09-29T12:50:37Z [FLINK-7739][tests] Properly close flink mini cluster ---
[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185796#comment-16185796 ] ASF GitHub Bot commented on FLINK-7739: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4749 [FLINK-7739][tests] Properly shutdown resources in tests This is a fixup of tests, without touching the production code. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink kafka-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4749.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4749 commit 4e55087d091993a9552b5f732abab8a5c0a5a014 Author: Piotr NowojskiDate: 2017-09-29T12:45:28Z [FLINK-7739][kafka-tests] Shutdown NetworkFailureProxy commit a5bbc6fdf2a3e039739a0392dad3d61791f0c3fd Author: Piotr Nowojski Date: 2017-09-29T12:50:37Z [FLINK-7739][tests] Properly close flink mini cluster > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7739) Improve Kafka*ITCase tests stability
Piotr Nowojski created FLINK-7739: - Summary: Improve Kafka*ITCase tests stability Key: FLINK-7739 URL: https://issues.apache.org/jira/browse/FLINK-7739 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.3.2 Reporter: Piotr Nowojski Assignee: Piotr Nowojski -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
[ https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185685#comment-16185685 ] ASF GitHub Bot commented on FLINK-7668: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4728 +1 from my side too. > Add AccessExecutionGraph refresh interval to ExecutionGraphHolder > - > > Key: FLINK-7668 > URL: https://issues.apache.org/jira/browse/FLINK-7668 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > Once we support offline {{AccessExecutionGraph}} implementation (see > FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} > after which the {{AccessExecutionGraph}} is retrieved again from the > {{JobMaster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4728: [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph b...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4728 +1 from my side too. ---
[GitHub] flink pull request #4748: [hotfix][tests] Use G1GC for tests
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4748 [hotfix][tests] Use G1GC for tests We are using G1GC for running TaskManager, I think we should do it so also for tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink g1gc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4748 commit e5aff977d8ee1efac4fb4c12c57431568618a7e1 Author: Piotr NowojskiDate: 2017-09-27T16:38:42Z [hotfix][tests] Use G1GC for tests ---
[jira] [Commented] (FLINK-7708) Port CheckpointConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185663#comment-16185663 ] ASF GitHub Bot commented on FLINK-7708: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4744#discussion_r141836633 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Response class of the {@link CheckpointConfigHandler}. + */ +public class CheckpointConfigInfo implements ResponseBody { + + public static final String FIELD_NAME_PROCESSING_MODE = "mode"; + + public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval"; + + public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout"; + + public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause"; + + public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent"; + + public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization"; + --- End diff -- remove empty line (should fail checkstyle) > Port CheckpointConfigHandler to new REST endpoint > - > > Key: FLINK-7708 > URL: https://issues.apache.org/jira/browse/FLINK-7708 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointConfigHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4744: [FLINK-7708] [flip6] Add CheckpointConfigHandler f...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4744#discussion_r141836633 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Response class of the {@link CheckpointConfigHandler}. + */ +public class CheckpointConfigInfo implements ResponseBody { + + public static final String FIELD_NAME_PROCESSING_MODE = "mode"; + + public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval"; + + public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout"; + + public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause"; + + public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent"; + + public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization"; + --- End diff -- remove empty line (should fail checkstyle) ---
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185592#comment-16185592 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4625 Thanks for the update @xccui. I'll have a look in the next days. > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185591#comment-16185591 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r141831721 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize
[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4625 Thanks for the update @xccui. I'll have a look in the next days. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r141831721 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[GitHub] flink pull request #4747: [FLINK-7728] [DataStream] Flush StatusWatermarkVal...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4747 [FLINK-7728] [DataStream] Flush StatusWatermarkValve once all inputs become idle ## What is the purpose of the change This PR is based on #4738. Only the last three commits are relevant. Prior to this PR, once all inputs of the `StatusWatermarkValve` becomes idle, we only emit the `StreamStatus.IDLE marker`, and check nothing else. This makes the watermark advancement behaviour inconsistent in the case that all inputs become idle, depending on the order that they become idle. This PR fixes this by "flushing" the max watermark across all channels once all inputs become idle. At a high-level, what this means for downstream operators is that all inputs have become idle and will temporarily cease to advance their watermarks, so they can safely advance their event time to whatever the largest watermark is. This PR also includes changes to `StatusWatermarkValveTest` to cover the above mentioned case, as well as reworking the unit tests to be more fine-grained with small enough test case scope, so that we have a better overview of what cases are covered. ## Brief change log - d11d98d preliminary cleanup of `StatusWatermarkValveTest#BufferedValveOutputHandler`. Previous implementation was overly complex and could not provide test behaviors that we required. - 0e386da main change for this PR. Includes a new test in `StatusWatermarkValve` to cover the missing case. That test does not pass without this change. - b977c18 refactor unit tests to be more fine-grained scoped. ## Verifying this change `StatusWatermarkValveTest`, `OneInputStreamTaskTest`, `TwoInputStreamTaskTest` should be able to verify this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): **YES** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7728 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4747.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4747 commit a9ce90aaee99374513941270b804de2f5564c47e Author: Tzu-Li (Gordon) TaiDate: 2017-09-27T18:05:21Z [FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated from watermark-aligned inputs Prior to this commit, In the calculation of the new min watermark in StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(), there is no verification that the calculated new min watermark really is aggregated from some aligned channel. In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation is Long.MAX_VALUE and emit that. This commit fixes this by only emitting the aggregated watermark iff it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a Long.MAX_VALUE was genuinely aggregated from the input channels. commit d11d98dda4723760e7d9fb3b9680a1e9daca3705 Author: Tzu-Li (Gordon) Tai Date: 2017-09-28T12:11:22Z [FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in StatusWatermarkValveTest The previous implementation was overly complicated. Having separate buffers for the StreamStatus and Watermarks is not required for our tests. Also, that design doesn't allow checking the order StreamStatus / Watermarks are emitted from a single input to the valve. This commit reworks it by buffering both StreamStatus and Watermarks in a shared queue. commit 0e386dab2fc19df78276ce203ed8f38792145759 Author: Tzu-Li (Gordon) Tai Date: 2017-09-28T14:49:22Z [FLINK-7728] [DataStream] Flush max watermark across all inputs once all become idle Prior to this commit, once all inputs of the StatusWatermarkValve becomes idle, we only emit the StreamStatus.IDLE marker, and check nothing else. This makes the watermark advancement
[jira] [Commented] (FLINK-7728) StatusWatermarkValve has different min watermark advancement behavior depending on the ordering inputs become idle
[ https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185574#comment-16185574 ] ASF GitHub Bot commented on FLINK-7728: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4747 [FLINK-7728] [DataStream] Flush StatusWatermarkValve once all inputs become idle ## What is the purpose of the change This PR is based on #4738. Only the last three commits are relevant. Prior to this PR, once all inputs of the `StatusWatermarkValve` becomes idle, we only emit the `StreamStatus.IDLE marker`, and check nothing else. This makes the watermark advancement behaviour inconsistent in the case that all inputs become idle, depending on the order that they become idle. This PR fixes this by "flushing" the max watermark across all channels once all inputs become idle. At a high-level, what this means for downstream operators is that all inputs have become idle and will temporarily cease to advance their watermarks, so they can safely advance their event time to whatever the largest watermark is. This PR also includes changes to `StatusWatermarkValveTest` to cover the above mentioned case, as well as reworking the unit tests to be more fine-grained with small enough test case scope, so that we have a better overview of what cases are covered. ## Brief change log - d11d98d preliminary cleanup of `StatusWatermarkValveTest#BufferedValveOutputHandler`. Previous implementation was overly complex and could not provide test behaviors that we required. - 0e386da main change for this PR. Includes a new test in `StatusWatermarkValve` to cover the missing case. That test does not pass without this change. - b977c18 refactor unit tests to be more fine-grained scoped. ## Verifying this change `StatusWatermarkValveTest`, `OneInputStreamTaskTest`, `TwoInputStreamTaskTest` should be able to verify this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): **YES** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7728 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4747.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4747 commit a9ce90aaee99374513941270b804de2f5564c47e Author: Tzu-Li (Gordon) TaiDate: 2017-09-27T18:05:21Z [FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated from watermark-aligned inputs Prior to this commit, In the calculation of the new min watermark in StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(), there is no verification that the calculated new min watermark really is aggregated from some aligned channel. In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation is Long.MAX_VALUE and emit that. This commit fixes this by only emitting the aggregated watermark iff it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a Long.MAX_VALUE was genuinely aggregated from the input channels. commit d11d98dda4723760e7d9fb3b9680a1e9daca3705 Author: Tzu-Li (Gordon) Tai Date: 2017-09-28T12:11:22Z [FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in StatusWatermarkValveTest The previous implementation was overly complicated. Having separate buffers for the StreamStatus and Watermarks is not required for our tests. Also, that design doesn't allow checking the order StreamStatus / Watermarks are emitted from a single input to the valve. This commit reworks it by buffering both StreamStatus and Watermarks in a shared queue. commit 0e386dab2fc19df78276ce203ed8f38792145759 Author: Tzu-Li (Gordon) Tai Date: 2017-09-28T14:49:22Z [FLINK-7728] [DataStream] Flush max watermark
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r141822004 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile resourceProfile) { @Override public void stopWorker(ResourceID resourceID) { --- End diff -- Do you mean something like MesosWorkerStore.Worker in MesosResourceManager, which has both the taskId (used as resourceID) and other context info for the Mesos worker? ---
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185544#comment-16185544 ] ASF GitHub Bot commented on FLINK-7076: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r141822004 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile resourceProfile) { @Override public void stopWorker(ResourceID resourceID) { --- End diff -- Do you mean something like MesosWorkerStore.Worker in MesosResourceManager, which has both the taskId (used as resourceID) and other context info for the Mesos worker? > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185532#comment-16185532 ] ASF GitHub Bot commented on FLINK-7076: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r141820967 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile resourceProfile) { @Override public void stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker + ContainerId containerId = containerIdMap.remove(resourceID.getResourceIdString()); + if (containerId != null) { + log.info("Stopping container {}.", containerId.toString()); + resourceManagerClient.releaseAssignedContainer(containerId); --- End diff -- The ContainerId for YARN can be only generated using ContainerId.newInstance, which requires the application attempt id and the container Id. The resourceID only contains the container Id information, so it's not sufficient to rebuild the YARN ContainerId object that the releaseAssignedContainer method takes. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r141820967 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile resourceProfile) { @Override public void stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker + ContainerId containerId = containerIdMap.remove(resourceID.getResourceIdString()); + if (containerId != null) { + log.info("Stopping container {}.", containerId.toString()); + resourceManagerClient.releaseAssignedContainer(containerId); --- End diff -- The ContainerId for YARN can be only generated using ContainerId.newInstance, which requires the application attempt id and the container Id. The resourceID only contains the container Id information, so it's not sufficient to rebuild the YARN ContainerId object that the releaseAssignedContainer method takes. ---
[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
[ https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185519#comment-16185519 ] ASF GitHub Bot commented on FLINK-7668: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4728 Thanks for addressing / replying to my comments. LGTM, +1 from me > Add AccessExecutionGraph refresh interval to ExecutionGraphHolder > - > > Key: FLINK-7668 > URL: https://issues.apache.org/jira/browse/FLINK-7668 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > Once we support offline {{AccessExecutionGraph}} implementation (see > FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} > after which the {{AccessExecutionGraph}} is retrieved again from the > {{JobMaster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4728: [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph b...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4728 Thanks for addressing / replying to my comments. LGTM, +1 from me ---
[GitHub] flink pull request #4728: [FLINK-7668] Add ExecutionGraphCache for Execution...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4728#discussion_r141817520 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java --- @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.JobNotFoundException; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link ExecutionGraphCache}. + */ +public class ExecutionGraphCacheTest extends TestLogger { + + /** +* Tests that we can cache AccessExecutionGraphs over multiple accesses. +*/ + @Test + public void testExecutionGraphCaching() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, accessExecutionGraphFuture.get()); + + CompletableFuture accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, accessExecutionGraphFuture2.get()); + + // verify that we only issued a single request to the gateway + verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); + } + } + + /** +* Tests that an AccessExecutionGraph is invalidated after its TTL expired. +*/ + @Test + public void testExecutionGraphEntryInvalidation() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive =
[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
[ https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185518#comment-16185518 ] ASF GitHub Bot commented on FLINK-7668: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4728#discussion_r141817520 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java --- @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.JobNotFoundException; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link ExecutionGraphCache}. + */ +public class ExecutionGraphCacheTest extends TestLogger { + + /** +* Tests that we can cache AccessExecutionGraphs over multiple accesses. +*/ + @Test + public void testExecutionGraphCaching() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, accessExecutionGraphFuture.get()); + + CompletableFuture accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, accessExecutionGraphFuture2.get()); + + // verify that we only issued a single request to the gateway + verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); + } + } + + /** +* Tests that an AccessExecutionGraph is invalidated after its
[jira] [Assigned] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7710: Assignee: Till Rohrmann > Port CheckpointStatsHandler to new REST endpoint > > > Key: FLINK-7710 > URL: https://issues.apache.org/jira/browse/FLINK-7710 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
[ https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185507#comment-16185507 ] ASF GitHub Bot commented on FLINK-7668: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4728 I've rebased the code onto the latest master. Are all PR comments resolved @zentol and @tzulitai? > Add AccessExecutionGraph refresh interval to ExecutionGraphHolder > - > > Key: FLINK-7668 > URL: https://issues.apache.org/jira/browse/FLINK-7668 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > Once we support offline {{AccessExecutionGraph}} implementation (see > FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} > after which the {{AccessExecutionGraph}} is retrieved again from the > {{JobMaster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4728: [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph b...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4728 I've rebased the code onto the latest master. Are all PR comments resolved @zentol and @tzulitai? ---
[jira] [Commented] (FLINK-7667) Add serializable AccessExecutionGraph implementation
[ https://issues.apache.org/jira/browse/FLINK-7667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185506#comment-16185506 ] ASF GitHub Bot commented on FLINK-7667: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4727 > Add serializable AccessExecutionGraph implementation > > > Key: FLINK-7667 > URL: https://issues.apache.org/jira/browse/FLINK-7667 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > In order to decouple the REST endpoint from the {{JobMaster}} we should have > an "offline" implementation of the {{AccessExecutionGraph}} which is > serializable and can be set to remote peers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4727: [FLINK-7667] [flip6] Use ArchivedExecutionGraph as...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4727 ---
[jira] [Closed] (FLINK-7667) Add serializable AccessExecutionGraph implementation
[ https://issues.apache.org/jira/browse/FLINK-7667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7667. Resolution: Fixed Fix Version/s: 1.4.0 Fixed via 2dd557fad4a0a205a3e163fa918507d34c933c6a > Add serializable AccessExecutionGraph implementation > > > Key: FLINK-7667 > URL: https://issues.apache.org/jira/browse/FLINK-7667 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > In order to decouple the REST endpoint from the {{JobMaster}} we should have > an "offline" implementation of the {{AccessExecutionGraph}} which is > serializable and can be set to remote peers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7667) Add serializable AccessExecutionGraph implementation
[ https://issues.apache.org/jira/browse/FLINK-7667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185503#comment-16185503 ] ASF GitHub Bot commented on FLINK-7667: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4727 Merging this PR. > Add serializable AccessExecutionGraph implementation > > > Key: FLINK-7667 > URL: https://issues.apache.org/jira/browse/FLINK-7667 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to decouple the REST endpoint from the {{JobMaster}} we should have > an "offline" implementation of the {{AccessExecutionGraph}} which is > serializable and can be set to remote peers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4727: [FLINK-7667] [flip6] Use ArchivedExecutionGraph as serial...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4727 Merging this PR. ---
[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185502#comment-16185502 ] Till Rohrmann commented on FLINK-7648: -- In order to wait on the completion you can do {code} @Test public void testStandaloneSessionCluster() throws ExecutionException, InterruptedException { Configuration configuration = new Configuration(); configuration.setString(JobManagerOptions.ADDRESS, "localhost"); StandaloneSessionClusterEntrypoint standaloneSessionClusterEntrypoint = new StandaloneSessionClusterEntrypoint(configuration); standaloneSessionClusterEntrypoint.startCluster(); standaloneSessionClusterEntrypoint.getTerminationFuture().get(); } {code} > Port TaskManagersHandler to new REST endpoint > - > > Key: FLINK-7648 > URL: https://issues.apache.org/jira/browse/FLINK-7648 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Bowen Li > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{TaskManagersHandler}} to the new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7541) Redistribute operator state using OperatorID
[ https://issues.apache.org/jira/browse/FLINK-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-7541. - There are still parts of the code in the Flink that Relays on operator order > Redistribute operator state using OperatorID > > > Key: FLINK-7541 > URL: https://issues.apache.org/jira/browse/FLINK-7541 > Project: Flink > Issue Type: Improvement >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Currently StateAssignmentOperation relays heavily on the order of new and old > operators in the task. It should be changed and it should relay more on > OperatorID. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7541) Redistribute operator state using OperatorID
[ https://issues.apache.org/jira/browse/FLINK-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski resolved FLINK-7541. --- Resolution: Fixed Fix Version/s: 1.4.0 > Redistribute operator state using OperatorID > > > Key: FLINK-7541 > URL: https://issues.apache.org/jira/browse/FLINK-7541 > Project: Flink > Issue Type: Improvement >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Currently StateAssignmentOperation relays heavily on the order of new and old > operators in the task. It should be changed and it should relay more on > OperatorID. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185498#comment-16185498 ] Till Rohrmann commented on FLINK-7694: -- Yes, this will make sure that the web gui will also understand the response from the new {{JobMetricsOverviewHandler}}. > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-7683. - > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7695) Port JobConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185497#comment-16185497 ] ASF GitHub Bot commented on FLINK-7695: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4737 Thanks for the review @zentol and @yew1eb. I have to throw this PR once more on Travis to see if everything passes. Once this is done and the depended PRs are merged, I'll merge this one as well. > Port JobConfigHandler to new REST endpoint > -- > > Key: FLINK-7695 > URL: https://issues.apache.org/jira/browse/FLINK-7695 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the {{JobConfigHandler}} to the new {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski resolved FLINK-7683. --- Resolution: Fixed Fix Version/s: 1.4.0 > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4737: [FLINK-7695] [flip6] Add JobConfigHandler for new RestSer...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4737 Thanks for the review @zentol and @yew1eb. I have to throw this PR once more on Travis to see if everything passes. Once this is done and the depended PRs are merged, I'll merge this one as well. ---
[jira] [Commented] (FLINK-7704) Port JobPlanHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185493#comment-16185493 ] Till Rohrmann commented on FLINK-7704: -- Hi [~haizhou], great to hear that you want to help with Flip-6 :-) All help is highly welcome! Go ahead and do the port of the {{JobPlanHandler}}. > Port JobPlanHandler to new REST endpoint > > > Key: FLINK-7704 > URL: https://issues.apache.org/jira/browse/FLINK-7704 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Hai Zhou UTC+8 > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobPlanHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7695) Port JobConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185473#comment-16185473 ] ASF GitHub Bot commented on FLINK-7695: --- Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/4737 Hey @tillrohrmann, Will you merge this PR ? I want to fix sibling issues (FLINK-7704, FLINK-7705, FLINK-7706). :beers: Best, Hai Zhou > Port JobConfigHandler to new REST endpoint > -- > > Key: FLINK-7695 > URL: https://issues.apache.org/jira/browse/FLINK-7695 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the {{JobConfigHandler}} to the new {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4737: [FLINK-7695] [flip6] Add JobConfigHandler for new RestSer...
Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/4737 Hey @tillrohrmann, Will you merge this PR ? I want to fix sibling issues (FLINK-7704, FLINK-7705, FLINK-7706). :beers: Best, Hai Zhou ---