[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter

2017-09-29 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3511
  
> So I think a simpler and better approach is to just make sure that most 
types have a good implementation of putNormalizedKey, and then 
NormalizedKeySorter.compareRecords would be called only rarely, so its 
performance wouldn't really matter.

You are right when the sort keys are simple numeric types, but not with 
strings, which maybe the most popular choice in some ETL and data warehouse 
pipelines. But i agree that code generation can't help with this situation, so 
we investigate some binary data formats to represent our record and modify the 
interface of TypeSerializer & TypeComparator when doing ser/de. We don't have 
to consume the input/output view byte by byte, but has the ability to random 
access the underlying data, aka MemorySegment. It acts like spark's UnsafeRow: 
https://reviewable.io/reviews/apache/spark/5725, so we can eliminate the most 
deserialization cost such as `read byte[]` and then `new String(byte[])`.  We 
combine this approach with some code generation to eliminate the virtual 
function call of the TypeComparator and see a 10x performance improvements with 
sorting on strings. 

> I think a large potential in code-generation is to eliminate the 
overheads of the very many virtual function calls throughout the runtime

Totally agreed, after we finish dealing with the code generation and 
improving the ser/de, we will investigate more about this. Good to see that you 
have a list of all the megamorphic calls. BTW, we are actually translating the 
batch jobs into the streaming runtime, i think there will be lots in common. 

Having and control more type informations, and code generation the whole 
operator have lots of benefits, it can also help to making most of the calls 
monomorphic, such as:
- fully control of the object reusing, yes
- comparators
- generating hash codes
- potential improvements of some algorithm which finds out they only need 
to deal with fixed length data
- Directly using primitive variables when dealing with simple type

And you are right this is orthogonal with runtime improvements, and we see 
the boundary is the Operator. The framework should provide the most efficient 
environment for operators to run, and we will code generating the most 
efficient operators to live in it. 

> Btw. have you seen this PR for code generation for POJO serializers and 
comparators? #2211

I didn't see it yet, will find some time to check it out.


---


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186766#comment-16186766
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r141993041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r141993041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,
  

[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint

2017-09-29 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186631#comment-16186631
 ] 

Bowen Li commented on FLINK-7648:
-

[~till.rohrmann] I successfully started the cluster in a unit test. But, the 
browser shows {"errors":["Not found."]} when I visited 
{{http://127.0.0.1:9067/}}, and there's no UI. Is this expected, or did I do 
anything wrong? 

BTW, how to run a job in such a {{StandaloneSessionCluster}}. I'm porting 
{{JobMetricsHandler}} and it requires a running job to be able to visit that UI 
component.

Sorry for the back-and-forth questionsI'm still learning about the new 
server architecture

> Port TaskManagersHandler to new REST endpoint
> -
>
> Key: FLINK-7648
> URL: https://issues.apache.org/jira/browse/FLINK-7648
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{TaskManagersHandler}} to the new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-29 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186602#comment-16186602
 ] 

Shuyi Chen commented on FLINK-7491:
---

[~fhueske] can you help take another look at the PR? I've addressed your 
comments. Much appreciated.

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7740) Add parameter support in CassandraInputFormat

2017-09-29 Thread Bin Wang (JIRA)
Bin Wang created FLINK-7740:
---

 Summary: Add parameter support in CassandraInputFormat
 Key: FLINK-7740
 URL: https://issues.apache.org/jira/browse/FLINK-7740
 Project: Flink
  Issue Type: Improvement
Reporter: Bin Wang
Priority: Minor


I suggest to add a small improvement of CassandraInputFormat. It support CQL 
string as input only. I think adding parameter support is good for both 
security and make the CQL string simpler when there is a IN clause or the 
parameter value is very long.
e.g. "SELECT col0, col1, col2 from keyspace.table0 where col0 in ? and col1>?"
vs "SELECT col0, col1, col2 from keyspace.table0 where col0 in ('v0', 
'v1','v2',...,'vn') and col > 12345678901234"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186260#comment-16186260
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141945843
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -235,14 +240,15 @@ void releaseBuffer() {
ByteBuf write(ByteBufAllocator allocator) throws IOException {
checkNotNull(buffer, "No buffer instance to 
serialize.");
 
-   int length = 16 + 4 + 1 + 4 + buffer.getSize();
+   int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize();
--- End diff --

can you please add a comment here explaining reasons of the math?


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141945843
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -235,14 +240,15 @@ void releaseBuffer() {
ByteBuf write(ByteBufAllocator allocator) throws IOException {
checkNotNull(buffer, "No buffer instance to 
serialize.");
 
-   int length = 16 + 4 + 1 + 4 + buffer.getSize();
+   int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize();
--- End diff --

can you please add a comment here explaining reasons of the math?


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141945644
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -131,6 +136,63 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
 
+   public List requestMemorySegments(int 
numRequiredBuffers) throws IOException {
+   checkArgument(numRequiredBuffers > 0, "The number of required 
buffers should be larger than 0.");
+
+   synchronized (factoryLock) {
+   if (isDestroyed) {
+   throw new IllegalStateException("Network buffer 
pool has already been destroyed.");
+   }
+
+   if (numTotalRequiredBuffers + numRequiredBuffers > 
totalNumberOfMemorySegments) {
+   throw new 
IOException(String.format("Insufficient number of network buffers: " +
--- End diff --

How about adding a `FlinkNetworkException` or `FlinkNetworkIOException` for 
all network/netty related exceptions? That can help users better identify root 
causes of problems.


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186258#comment-16186258
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141945644
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -131,6 +136,63 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
 
+   public List requestMemorySegments(int 
numRequiredBuffers) throws IOException {
+   checkArgument(numRequiredBuffers > 0, "The number of required 
buffers should be larger than 0.");
+
+   synchronized (factoryLock) {
+   if (isDestroyed) {
+   throw new IllegalStateException("Network buffer 
pool has already been destroyed.");
+   }
+
+   if (numTotalRequiredBuffers + numRequiredBuffers > 
totalNumberOfMemorySegments) {
+   throw new 
IOException(String.format("Insufficient number of network buffers: " +
--- End diff --

How about adding a `FlinkNetworkException` or `FlinkNetworkIOException` for 
all network/netty related exceptions? That can help users better identify root 
causes of problems.


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186255#comment-16186255
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r141942758
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-09-29 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r141942758
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+   

[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186254#comment-16186254
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r141944569
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+  

[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186256#comment-16186256
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r141944634
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-09-29 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r141944569
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+   

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-09-29 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r141944634
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+   

[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186202#comment-16186202
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user EXPjbucher commented on the issue:

https://github.com/apache/flink/pull/3359
  
Hey, I was actually looking into this today and was wondering what the 
status of this is? We have this exact case where lots of timers are causing 
high memory use (most of which don't need to be in RAM at the same time).


> Implement Internal Timer Service in RocksDB
> ---
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in the order of timestamp. But when performing checkpoints, 
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of 
> merge sorting. We can store timers in RocksDB with the format 
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
> together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key 
> group to get the next timer to trigger. When a key group's first timer is 
> updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186083#comment-16186083
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4509
  
@NicoK , thanks for reviews and talking about this key point.

I think I understand your point and agree with that. I will submit the 
modifications before Monday.


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4509: [FLINK-7406][network] Implement Netty receiver incoming p...

2017-09-29 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4509
  
@NicoK , thanks for reviews and talking about this key point.

I think I understand your point and agree with that. I will submit the 
modifications before Monday.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186049#comment-16186049
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r141911428
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,20 +43,31 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
 
+   /** Channels, which will notify the producers about unannounced credit. 
*/
+   private final ArrayDeque inputChannelsWithCredit = 
new ArrayDeque<>();
+
private final AtomicReference channelError = new 
AtomicReference<>();
 
+   private final ChannelFutureListener writeListener = new 
WriteAndFlushNextMessageIfPossibleListener();
+
/**
-* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* Set of cancelled partition requests. A request is cancelled if an 
input channel is cleared
--- End diff --

I guess, this "iff" was correct in the meaning of "if and only if" (a 
common abbreviation in math) - if you like to, you can replace it with the full 
form though if that is still the logic behind


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r141911428
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,20 +43,31 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
 
+   /** Channels, which will notify the producers about unannounced credit. 
*/
+   private final ArrayDeque inputChannelsWithCredit = 
new ArrayDeque<>();
+
private final AtomicReference channelError = new 
AtomicReference<>();
 
+   private final ChannelFutureListener writeListener = new 
WriteAndFlushNextMessageIfPossibleListener();
+
/**
-* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* Set of cancelled partition requests. A request is cancelled if an 
input channel is cleared
--- End diff --

I guess, this "iff" was correct in the meaning of "if and only if" (a 
common abbreviation in math) - if you like to, you can replace it with the full 
form though if that is still the logic behind


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141906648
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws 
Exception {
verify(inputGate, 
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
}
 
+   /**
+* Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
+* floating buffers once receiving the producer's backlog.
+*/
+   @Test
+   public void testRequestFloatingBuffersOnBuffer() throws Exception {
+   // Setup
+   final BufferPool bufferPool = mock(BufferPool.class);
+   
when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   when(inputGate.getBufferPool()).thenReturn(bufferPool);
+
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+
+   // Receive the producer's backlog
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+   // Need to request 10 floating buffers from buffer pool
+   verify(bufferPool, times(10)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8);
+   // No need to request extra floating buffers from pool because
+   // there are already 10 available buffers in queue now
+   verify(bufferPool, times(10)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11);
+   // Need to request another floating buffer from pool
+   verify(bufferPool, times(11)).requestBuffer();
+   }
+
+   /**
+* Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
+* floating buffers once receiving the producer's backlog. And it 
requests from pool only once
+* and registers as listener if there are currently no available 
buffers in the pool.
+*/
+   @Test
+   public void testWaitForFloatingBuffersOnBuffer() throws Exception {
+   // Setup
+   final BufferPool bufferPool = mock(BufferPool.class);
+   when(bufferPool.requestBuffer()).thenReturn(null);
+   
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
--- End diff --

How about using the real `NetworkBufferPool#createBufferPool()` here with a 
limited set of buffers? Then you could start retrieving buffers as in the test 
method above and continue with verifying the expected behaviour in case the 
buffer limit was reached (no need for two test methods, I guess). I'd prefer 
this over a mock so that you can also verify the interaction with the real 
methods such as `addBufferListener()`.


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186043#comment-16186043
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141896488
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int 
sequenceNumber) {
}
}
}
+
+   if (success && backlog > 0) {
+   onSenderBacklog(backlog);
+   }
+
} finally {
if (!success) {
buffer.recycle();
}
}
}
 
-   public void onEmptyBuffer(int sequenceNumber) {
+   public void onEmptyBuffer(int sequenceNumber, int backlog) {
+   boolean success = false;
+
synchronized (receivedBuffers) {
if (!isReleased.get()) {
if (expectedSequenceNumber == sequenceNumber) {
expectedSequenceNumber++;
+   success = true;
} else {
onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
}
}
}
+
+   if (success && backlog > 0) {
--- End diff --

same here


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186040#comment-16186040
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141890051
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
+   }
+   }
+
+   /**
+* Receives the backlog from producer's buffer response. If the number 
of available
+* buffers is less than the backlog length, it will request floating 
buffers from buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   private void onSenderBacklog(int backlog) {
+   int numRequestedBuffers = 0;
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (!isReleased.get()) {
+   senderBacklog.set(backlog);
+
+   while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
+   Buffer buffer = null;
+   try {
+   buffer = 
inputGate.getBufferPool().requestBuffer();
+   } catch (IOException ex) {
--- End diff --

This only affects the two methods you mentioned which, themselves, are 
called by `PartitionRequestClientHandler`/`CreditBasedClientHandler` helper 
methods allowing `Throwable` - so it's actually not much. I'd prefer not 
wrapping the exception further here for simplicity in case of errors


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186046#comment-16186046
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141908388
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws 
Exception {
verify(inputGate, 
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
}
 
+   /**
+* Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
+* floating buffers once receiving the producer's backlog.
+*/
+   @Test
+   public void testRequestFloatingBuffersOnBuffer() throws Exception {
+   // Setup
+   final BufferPool bufferPool = mock(BufferPool.class);
+   
when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   when(inputGate.getBufferPool()).thenReturn(bufferPool);
+
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+
+   // Receive the producer's backlog
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+   // Need to request 10 floating buffers from buffer pool
+   verify(bufferPool, times(10)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8);
+   // No need to request extra floating buffers from pool because
+   // there are already 10 available buffers in queue now
+   verify(bufferPool, times(10)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11);
+   // Need to request another floating buffer from pool
+   verify(bufferPool, times(11)).requestBuffer();
+   }
+
+   /**
+* Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
+* floating buffers once receiving the producer's backlog. And it 
requests from pool only once
+* and registers as listener if there are currently no available 
buffers in the pool.
+*/
+   @Test
+   public void testWaitForFloatingBuffersOnBuffer() throws Exception {
+   // Setup
+   final BufferPool bufferPool = mock(BufferPool.class);
+   when(bufferPool.requestBuffer()).thenReturn(null);
+   
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
+
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   when(inputGate.getBufferPool()).thenReturn(bufferPool);
+   when(inputGate.getBufferProvider()).thenReturn(bufferPool);
+
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+
+   // Receive the producer's backlog
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+   // Request from pool only once if there are no available 
floating buffers
+   verify(bufferPool, times(1)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 10);
+   // Already registers as listener to wait for notifications and 
will not request any more
+   verify(bufferPool, times(1)).requestBuffer();
+   }
+
--- End diff --

actually, some more tests would be nice:

- ensuring a fair distribution of buffers to `BufferListener`s
- to verify that there is no race condition with two things running in 
parallel: `onSenderBacklog()` or `notifyBufferAvailable()` requesting buffers 
and some other thread recycling them (floating and/or exclusive ones).


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can 

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186038#comment-16186038
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141901569
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -209,6 +276,95 @@ public String toString() {
}
 
// 

+   // Credit-based
+   // 

+
+   /**
+* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+*/
+   void notifyCreditAvailable() {
+   //TODO in next PR
+   }
+
+   /**
+* Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
+* credit to producer.
+*
+* @param segment The exclusive segment of this channel.
+*/
+   @Override
+   public void recycle(MemorySegment segment) {
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   // that way the segment can also be returned to global 
pool after added into
+   // the available queue during releasing all resources.
+   if (isReleased.get()) {
+   try {
+   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+   return;
+   } catch (Throwable t) {
+   ExceptionUtils.rethrow(t);
+   }
+   }
+   availableBuffers.add(new Buffer(segment, this));
+   }
+
+   if (unannouncedCredit.getAndAdd(1) == 0) {
+   notifyCreditAvailable();
+   }
+   }
+
+   public int getNumberOfAvailableBuffers() {
+   synchronized (availableBuffers) {
+   return availableBuffers.size();
+   }
+   }
+
+   /**
+* The Buffer pool notifies this channel of an available floating 
buffer. If the channel is released or
+* currently does not need extra buffers, the buffer should be recycled 
to the buffer pool. Otherwise,
+* the buffer will be added into the availableBuffers queue 
and the unannounced credit is
+* increased by one.
+*
+* @param buffer Buffer that becomes available in buffer pool.
+* @return True when this channel is waiting for more floating buffers, 
otherwise false.
+*/
+   @Override
+   public boolean notifyBufferAvailable(Buffer buffer) {
+   checkState(isWaitingForFloatingBuffers.get(), "This channel 
should be waiting for floating buffers.");
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (isReleased.get() || availableBuffers.size() >= 
senderBacklog.get()) {
+   isWaitingForFloatingBuffers.set(false);
+   buffer.recycle();
+
+   return false;
+   }
+
+   availableBuffers.add(buffer);
+
+   if (unannouncedCredit.getAndAdd(1) == 0) {
+   notifyCreditAvailable();
+   }
--- End diff --

can we do this outside the `synchronized` block?


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} 

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186045#comment-16186045
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141896356
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int 
sequenceNumber) {
}
}
}
+
+   if (success && backlog > 0) {
--- End diff --

Shouldn't we call this for `success && backlog >= 0` to try to always have 
`initialCredit` (extra) buffers available (if there are enough buffers)?


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141901569
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -209,6 +276,95 @@ public String toString() {
}
 
// 

+   // Credit-based
+   // 

+
+   /**
+* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+*/
+   void notifyCreditAvailable() {
+   //TODO in next PR
+   }
+
+   /**
+* Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
+* credit to producer.
+*
+* @param segment The exclusive segment of this channel.
+*/
+   @Override
+   public void recycle(MemorySegment segment) {
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   // that way the segment can also be returned to global 
pool after added into
+   // the available queue during releasing all resources.
+   if (isReleased.get()) {
+   try {
+   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+   return;
+   } catch (Throwable t) {
+   ExceptionUtils.rethrow(t);
+   }
+   }
+   availableBuffers.add(new Buffer(segment, this));
+   }
+
+   if (unannouncedCredit.getAndAdd(1) == 0) {
+   notifyCreditAvailable();
+   }
+   }
+
+   public int getNumberOfAvailableBuffers() {
+   synchronized (availableBuffers) {
+   return availableBuffers.size();
+   }
+   }
+
+   /**
+* The Buffer pool notifies this channel of an available floating 
buffer. If the channel is released or
+* currently does not need extra buffers, the buffer should be recycled 
to the buffer pool. Otherwise,
+* the buffer will be added into the availableBuffers queue 
and the unannounced credit is
+* increased by one.
+*
+* @param buffer Buffer that becomes available in buffer pool.
+* @return True when this channel is waiting for more floating buffers, 
otherwise false.
+*/
+   @Override
+   public boolean notifyBufferAvailable(Buffer buffer) {
+   checkState(isWaitingForFloatingBuffers.get(), "This channel 
should be waiting for floating buffers.");
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (isReleased.get() || availableBuffers.size() >= 
senderBacklog.get()) {
+   isWaitingForFloatingBuffers.set(false);
+   buffer.recycle();
+
+   return false;
+   }
+
+   availableBuffers.add(buffer);
+
+   if (unannouncedCredit.getAndAdd(1) == 0) {
+   notifyCreditAvailable();
+   }
--- End diff --

can we do this outside the `synchronized` block?


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141886467
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
+   }
+   }
+
+   /**
+* Receives the backlog from producer's buffer response. If the number 
of available
+* buffers is less than the backlog length, it will request floating 
buffers from buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   private void onSenderBacklog(int backlog) {
+   int numRequestedBuffers = 0;
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (!isReleased.get()) {
+   senderBacklog.set(backlog);
+
+   while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
--- End diff --

I was thinking about it a bit more and was talking to @StephanEwen about 
it, and we think, that it is actually fine to grab all resources we need at the 
moment. If there are not enough buffers at some point, the fair distribution 
will start when the buffers are recycled, i.e. via the callbacks of the new 
`BufferListener`. Since each channel always has its own exclusive buffers, we 
can guarantee that it always makes progress anyway! Additionally, we cannot 
really make a fair distribution from the start when receiving the first backlog 
(since we do not know all the other backlogs) unless we're waiting some time 
which we also do not want.

I kind of like your idea of having a `numBuffersPerAllocation`. Let's keep 
this in our heads and evaluate the current solution first to see whether we 
need this addition.

Regarding the formula (which I took from the network FLIP):
- from the FLIP with regards to the buffer distribution strategy: `Design 
rationale 2: Each channel always tries to maintain a credit of 
‘backlog_length + initialCredit’. That means that each channel tries to 
build the receive window for its current backlog as much as possible from the 
floating buffers, and use the exclusive ‘initialCredit’ buffers as a means 
to grow the window.` That way we always have some buffers available immediately 
on the receiver side so the sender can continue sending new buffers immediately 
(as long as there are buffers available on the receiver) and we do not have to 
wait for the exclusive buffers to come back.
  - Note that this would have to be changed in the various checks for 
`availableBuffers.size() >= senderBacklog.get()`, e.g. in 
`RemoteInputChannel#notifyBufferAvailable()`.
  - Similarly, `RemoteInputChannel#recycle()` needs to be adapted, in case 
our exclusive buffers are in use and we requested `backlog_length + 
initialCredit - currentCredit` *floating* buffers in order not to stack up 
`2*initialCredit` buffers once `backlog == 0` again. (+ an according unit test)
- what do you mean with `backlog-currentCredit` not being very accurate? We 
guarantee that there are no more than `currentCredit` buffers on the wire (some 
alraedy in the channel, some only announced) and, at the time the buffer was 
sent, `backlog` additional buffers were queued so in order to send them, we 
always need `backlog-currentCredit` irrespective of how much credit is 
announced vs. being on the wire. Or am I not seeing something here?


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141865540
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
--- End diff --

...unless the channel has been released


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141890624
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
+   }
+   }
+
+   /**
+* Receives the backlog from the producer's buffer response. If the 
number of available
+* buffers is less than the backlog length, it will request floating 
buffers from the buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   private void onSenderBacklog(int backlog) {
+   int numRequestedBuffers = 0;
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (!isReleased.get()) {
+   senderBacklog.set(backlog);
+
+   while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
+   Buffer buffer = null;
+   try {
+   buffer = 
inputGate.getBufferPool().requestBuffer();
+   } catch (IOException ex) {
+   ExceptionUtils.rethrow(ex);
+   }
+
+   if (buffer != null) {
+   availableBuffers.add(buffer);
+   numRequestedBuffers++;
+   continue;
+   }
+
+   // If the channel has not got enough 
buffers, register it as listener to wait for more floating buffers.
+   if 
(inputGate.getBufferProvider().addBufferListener(this)) {
+   if 
(!isWaitingForFloatingBuffers.compareAndSet(false, true)) {
+   throw new 
IllegalStateException("This channel should not be waiting for floating 
buffers.");
+   }
--- End diff --

...instead, here you can actively `break` out of the loop early without 
having to check the conditions again


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141896356
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int 
sequenceNumber) {
}
}
}
+
+   if (success && backlog > 0) {
--- End diff --

Shouldn't we call this for `success && backlog >= 0` to try to always have 
`initialCredit` (extra) buffers available (if there are enough buffers)?


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186035#comment-16186035
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141890624
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
+   }
+   }
+
+   /**
+* Receives the backlog from the producer's buffer response. If the 
number of available
+* buffers is less than the backlog length, it will request floating 
buffers from the buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   private void onSenderBacklog(int backlog) {
+   int numRequestedBuffers = 0;
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (!isReleased.get()) {
+   senderBacklog.set(backlog);
+
+   while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
+   Buffer buffer = null;
+   try {
+   buffer = 
inputGate.getBufferPool().requestBuffer();
+   } catch (IOException ex) {
+   ExceptionUtils.rethrow(ex);
+   }
+
+   if (buffer != null) {
+   availableBuffers.add(buffer);
+   numRequestedBuffers++;
+   continue;
+   }
+
+   // If the channel has not got enough 
buffers, register it as listener to wait for more floating buffers.
+   if 
(inputGate.getBufferProvider().addBufferListener(this)) {
+   if 
(!isWaitingForFloatingBuffers.compareAndSet(false, true)) {
+   throw new 
IllegalStateException("This channel should not be waiting for floating 
buffers.");
+   }
--- End diff --

...instead, here you can actively `break` out of the loop early without 
having to check the conditions again


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186037#comment-16186037
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141865540
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
--- End diff --

...unless the channel has been released


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186044#comment-16186044
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141865630
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
--- End diff --

can you mark this method `@Nullable`?


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186041#comment-16186041
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141902956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -70,6 +79,21 @@
 */
private int expectedSequenceNumber = 0;
 
+   /** The initial number of exclusive buffers assigned to this channel. */
+   private int initialCredit;
+
+   /** The current available buffers including both exclusive buffers and 
requested floating buffers. */
+   private final ArrayDeque availableBuffers = new ArrayDeque<>();
+
+   /** The number of available buffers that have not been announced to the 
producer yet. */
+   private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
+
+   /** The number of unsent buffers in the producer's sub partition. */
+   private final AtomicInteger senderBacklog = new AtomicInteger(0);
+
+   /** The tag indicates whether this channel is waiting for additional 
floating buffers from the buffer pool. */
+   private final AtomicBoolean isWaitingForFloatingBuffers = new 
AtomicBoolean(false);
--- End diff --

Now seeing this in action: do we really need a `AtomicBoolean`? Or is a 
`volatile boolean` enough? All uses except for `notifyBufferDestroyed()` (where 
only a safety-check uses the value) are actually under a `synchronized 
(availableBuffers)` block...in this case, you may also annotate the variable as 
`@GuardedBy("availableBuffers")` for documentation.


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186039#comment-16186039
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141886467
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
+   }
+   }
+
+   /**
+* Receives the backlog from producer's buffer response. If the number 
of available
+* buffers is less than the backlog length, it will request floating 
buffers from buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   private void onSenderBacklog(int backlog) {
+   int numRequestedBuffers = 0;
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (!isReleased.get()) {
+   senderBacklog.set(backlog);
+
+   while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
--- End diff --

I was thinking about it a bit more and was talking to @StephanEwen about 
it, and we think, that it is actually fine to grab all resources we need at the 
moment. If there are not enough buffers at some point, the fair distribution 
will start when the buffers are recycled, i.e. via the callbacks of the new 
`BufferListener`. Since each channel always has its own exclusive buffers, we 
can guarantee that it always makes progress anyway! Additionally, we cannot 
really make a fair distribution from the start when receiving the first backlog 
(since we do not know all the other backlogs) unless we're waiting some time 
which we also do not want.

I kind of like your idea of having a `numBuffersPerAllocation`. Let's keep 
this in our heads and evaluate the current solution first to see whether we 
need this addition.

Regarding the formula (which I took from the network FLIP):
- from the FLIP with regards to the buffer distribution strategy: `Design 
rationale 2: Each channel always tries to maintain a credit of ‘backlog_length 
+ initialCredit’. That means that each channel tries to build the receive 
window for its current backlog as much as possible from the floating buffers, 
and use the exclusive ‘initialCredit’ buffers as a means to grow the window.` 
That way we always have some buffers available immediately on the receiver side 
so the sender can continue sending new buffers immediately (as long as there 
are buffers available on the receiver) and we do not have to wait for the 
exclusive buffers to come back.
  - Note that this would have to be changed in the various checks for 
`availableBuffers.size() >= senderBacklog.get()`, e.g. in 
`RemoteInputChannel#notifyBufferAvailable()`.
  - Similarly, `RemoteInputChannel#recycle()` needs to be adapted, in case 
our exclusive buffers are in use and we requested `backlog_length + 
initialCredit - currentCredit` *floating* buffers in order not to stack up 
`2*initialCredit` buffers once `backlog == 0` again. (+ an according unit test)
- what do you mean with `backlog-currentCredit` not being very accurate? We 
guarantee that there are no more than `currentCredit` buffers on the wire (some 
alraedy in the channel, some only announced) and, at the time the buffer was 
sent, `backlog` additional buffers were queued so in order to send them, we 
always need `backlog-currentCredit` irrespective of how much credit is 
announced vs. being on the wire. Or am I not seeing something here?


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of 

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186042#comment-16186042
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141906648
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws 
Exception {
verify(inputGate, 
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
}
 
+   /**
+* Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
+* floating buffers once receiving the producer's backlog.
+*/
+   @Test
+   public void testRequestFloatingBuffersOnBuffer() throws Exception {
+   // Setup
+   final BufferPool bufferPool = mock(BufferPool.class);
+   
when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   when(inputGate.getBufferPool()).thenReturn(bufferPool);
+
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+
+   // Receive the producer's backlog
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+   // Need to request 10 floating buffers from buffer pool
+   verify(bufferPool, times(10)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8);
+   // No need to request extra floating buffers from pool because
+   // there are already 10 available buffers in queue now
+   verify(bufferPool, times(10)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11);
+   // Need to request another floating buffer from pool
+   verify(bufferPool, times(11)).requestBuffer();
+   }
+
+   /**
+* Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
+* floating buffers once receiving the producer's backlog. And it 
requests from pool only once
+* and registers as listener if there are currently no available 
buffers in the pool.
+*/
+   @Test
+   public void testWaitForFloatingBuffersOnBuffer() throws Exception {
+   // Setup
+   final BufferPool bufferPool = mock(BufferPool.class);
+   when(bufferPool.requestBuffer()).thenReturn(null);
+   
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
--- End diff --

How about using the real `NetworkBufferPool#createBufferPool()` here with a 
limited set of buffers? Then you could start retrieving buffers as in the test 
method above and continue with verifying the expected behaviour in case the 
buffer limit was reached (no need for two test methods, I guess). I'd prefer 
this over a mock so that you can also verify the interaction with the real 
methods such as `addBufferListener()`.


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186036#comment-16186036
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141890343
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
+   }
+   }
+
+   /**
+* Receives the backlog from the producer's buffer response. If the 
number of available
+* buffers is less than the backlog length, it will request floating 
buffers from the buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   private void onSenderBacklog(int backlog) {
+   int numRequestedBuffers = 0;
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (!isReleased.get()) {
+   senderBacklog.set(backlog);
+
+   while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
+   Buffer buffer = null;
+   try {
+   buffer = 
inputGate.getBufferPool().requestBuffer();
+   } catch (IOException ex) {
+   ExceptionUtils.rethrow(ex);
+   }
+
+   if (buffer != null) {
+   availableBuffers.add(buffer);
+   numRequestedBuffers++;
+   continue;
--- End diff --

How about using the rest in an `else` statement, instead of `continue`ing 
here? This would be more intuitive, I guess.


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141902956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -70,6 +79,21 @@
 */
private int expectedSequenceNumber = 0;
 
+   /** The initial number of exclusive buffers assigned to this channel. */
+   private int initialCredit;
+
+   /** The current available buffers including both exclusive buffers and 
requested floating buffers. */
+   private final ArrayDeque availableBuffers = new ArrayDeque<>();
+
+   /** The number of available buffers that have not been announced to the 
producer yet. */
+   private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
+
+   /** The number of unsent buffers in the producer's sub partition. */
+   private final AtomicInteger senderBacklog = new AtomicInteger(0);
+
+   /** The tag indicates whether this channel is waiting for additional 
floating buffers from the buffer pool. */
+   private final AtomicBoolean isWaitingForFloatingBuffers = new 
AtomicBoolean(false);
--- End diff --

Now seeing this in action: do we really need a `AtomicBoolean`? Or is a 
`volatile boolean` enough? All uses except for `notifyBufferDestroyed()` (where 
only a safety-check uses the value) are actually under a `synchronized 
(availableBuffers)` block...in this case, you may also annotate the variable as 
`@GuardedBy("availableBuffers")` for documentation.


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141865630
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
--- End diff --

can you mark this method `@Nullable`?


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141908388
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws 
Exception {
verify(inputGate, 
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
}
 
+   /**
+* Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
+* floating buffers once receiving the producer's backlog.
+*/
+   @Test
+   public void testRequestFloatingBuffersOnBuffer() throws Exception {
+   // Setup
+   final BufferPool bufferPool = mock(BufferPool.class);
+   
when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   when(inputGate.getBufferPool()).thenReturn(bufferPool);
+
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+
+   // Receive the producer's backlog
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+   // Need to request 10 floating buffers from buffer pool
+   verify(bufferPool, times(10)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8);
+   // No need to request extra floating buffers from pool because
+   // there are already 10 available buffers in queue now
+   verify(bufferPool, times(10)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11);
+   // Need to request another floating buffer from pool
+   verify(bufferPool, times(11)).requestBuffer();
+   }
+
+   /**
+* Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
+* floating buffers once receiving the producer's backlog. And it 
requests from pool only once
+* and registers as listener if there are currently no available 
buffers in the pool.
+*/
+   @Test
+   public void testWaitForFloatingBuffersOnBuffer() throws Exception {
+   // Setup
+   final BufferPool bufferPool = mock(BufferPool.class);
+   when(bufferPool.requestBuffer()).thenReturn(null);
+   
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
+
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   when(inputGate.getBufferPool()).thenReturn(bufferPool);
+   when(inputGate.getBufferProvider()).thenReturn(bufferPool);
+
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+
+   // Receive the producer's backlog
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+   // Request from pool only once if there are no available 
floating buffers
+   verify(bufferPool, times(1)).requestBuffer();
+
+   inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 10);
+   // Already registers as listener to wait for notifications and 
will not request any more
+   verify(bufferPool, times(1)).requestBuffer();
+   }
+
--- End diff --

actually, some more tests would be nice:

- ensuring a fair distribution of buffers to `BufferListener`s
- to verify that there is no race condition with two things running in 
parallel: `onSenderBacklog()` or `notifyBufferAvailable()` requesting buffers 
and some other thread recycling them (floating and/or exclusive ones).


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141890343
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
+   }
+   }
+
+   /**
+* Receives the backlog from the producer's buffer response. If the 
number of available
+* buffers is less than the backlog length, it will request floating 
buffers from the buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   private void onSenderBacklog(int backlog) {
+   int numRequestedBuffers = 0;
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (!isReleased.get()) {
+   senderBacklog.set(backlog);
+
+   while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
+   Buffer buffer = null;
+   try {
+   buffer = 
inputGate.getBufferPool().requestBuffer();
+   } catch (IOException ex) {
+   ExceptionUtils.rethrow(ex);
+   }
+
+   if (buffer != null) {
+   availableBuffers.add(buffer);
+   numRequestedBuffers++;
+   continue;
--- End diff --

How about using the rest in an `else` statement, instead of `continue`ing 
here? This would be more intuitive, I guess.


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141896488
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int 
sequenceNumber) {
}
}
}
+
+   if (success && backlog > 0) {
+   onSenderBacklog(backlog);
+   }
+
} finally {
if (!success) {
buffer.recycle();
}
}
}
 
-   public void onEmptyBuffer(int sequenceNumber) {
+   public void onEmptyBuffer(int sequenceNumber, int backlog) {
+   boolean success = false;
+
synchronized (receivedBuffers) {
if (!isReleased.get()) {
if (expectedSequenceNumber == sequenceNumber) {
expectedSequenceNumber++;
+   success = true;
} else {
onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
}
}
}
+
+   if (success && backlog > 0) {
--- End diff --

same here


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-29 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141890051
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode.
+*
+* @return The available buffer.
+*/
+   public Buffer requestBuffer() {
+   synchronized (availableBuffers) {
+   return availableBuffers.poll();
+   }
+   }
+
+   /**
+* Receives the backlog from producer's buffer response. If the number 
of available
+* buffers is less than the backlog length, it will request floating 
buffers from buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   private void onSenderBacklog(int backlog) {
+   int numRequestedBuffers = 0;
+
+   synchronized (availableBuffers) {
+   // Important: the isReleased check should be inside the 
synchronized block.
+   if (!isReleased.get()) {
+   senderBacklog.set(backlog);
+
+   while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
+   Buffer buffer = null;
+   try {
+   buffer = 
inputGate.getBufferPool().requestBuffer();
+   } catch (IOException ex) {
--- End diff --

This only affects the two methods you mentioned which, themselves, are 
called by `PartitionRequestClientHandler`/`CreditBasedClientHandler` helper 
methods allowing `Throwable` - so it's actually not much. I'd prefer not 
wrapping the exception further here for simplicity in case of errors


---


[jira] [Commented] (FLINK-7735) Improve date/time handling in publically-facing Expressions

2017-09-29 Thread Alexey Diomin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186007#comment-16186007
 ] 

Alexey Diomin commented on FLINK-7735:
--

Java 7 will be dropped very soon FLINK-7242

Maybe make sense discus in java 8 time API?

> Improve date/time handling in publically-facing Expressions
> ---
>
> Key: FLINK-7735
> URL: https://issues.apache.org/jira/browse/FLINK-7735
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Reporter: Kent Murra
>Priority: Minor
>
> I would like to discuss potential improvements to date/time/timestamp 
> handling in Expressions.  Since Flink now offers expression push down for 
> table sources, which includes time-related functions, timezone handling is 
> more visible to the end user.
> I think that the current usage of java.sql.Time, java.sql.Date, and 
> java.sql.Timestamp are fairly ambiguous.  We're taking a Date subclass in the 
> constructor of Literal, and assuming that the year, month, day, and hour 
> fields apply to UTC rather than the user's default timezone.   Per that 
> assumption, Flink is [adjusting the value of the epoch 
> timestamp|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106]
>  silently when converting to the RexLiteral.  This provides correct behavior 
> if the user assumes that the year/month/day/hour fields in the Date object 
> are the same timezone that the SQL statement assumes (which is UTC).  
> However, if they work at all with the epoch timestamp (which is a public 
> field) this can lead to incorrect results.  Moreover, its confusing if you're 
> considering the time zones your data is in, requiring some amount of research 
> to determine correct behavior.
> It would be ideal to:
> # Provide primitives that have time-zone information associated by default, 
> thus disambiguating the times. 
> # Properly document all TimeZone related assumptions in Expression literals.  
> # Document that the TIMESTAMP calcite function will assume that the timestamp 
> is in UTC in web documentation.  
> # Having a timezone based date parsing function in the SQL language.
> Regarding the primitives, since we have to support Java 7, we can't use Java 
> 8 time API.  I'm guessing it'd be a decision between using Joda Time or 
> making thin data structures that could easily be converted to various other 
> time primitives.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7700) State merging in RocksDB backend leaves old state

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185950#comment-16185950
 ] 

ASF GitHub Bot commented on FLINK-7700:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4752

[FLINK-7700] Fix RocksDB state merging on release-1.3

This is only for running the tests, which I can't do on my own Travis since 
tests time out.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink release-1.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4752.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4752


commit 64858a7881d22fb665099e1b2cd8363e447ed572
Author: Aljoscha Krettek 
Date:   2016-11-04T09:17:55Z

[FLINK-5619] Add numStateEntries() method for all keyed backends

This also adds a test for this in StateBackendTestBase

commit 31cd6db6a7d2f7d78aa05f6ff5fc82abbc79d042
Author: Aljoscha Krettek 
Date:   2017-09-27T09:37:09Z

[FLINK-7700] Fix RocksDB ListState merging

Before, the merged state was not cleared.

commit 5e3d49cc05591bf396bfcf9e578f8e30ce436114
Author: Aljoscha Krettek 
Date:   2017-09-27T09:38:13Z

[FLINK-5619] Consolidate ListState Tests in StateBackendTestBase

commit e0bd8c060da01e5defe528964bf638868cbc13d5
Author: Aljoscha Krettek 
Date:   2017-09-27T10:53:08Z

[FLINK-7700] Fix RocksDB ReducingState merging

Before, the merged state was not cleared.

commit 0ffca863a4b189675d2f15ee61f9d20fe49e85f0
Author: Aljoscha Krettek 
Date:   2017-09-27T10:54:19Z

[FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase

commit b0a7c091856aa2a99715a9fddcc801f660414cfd
Author: Aljoscha Krettek 
Date:   2017-09-27T11:01:55Z

[FLINK-7700] Fix RocksDB AggregatingState merging

Before, the merged state was not cleared.

commit 835de46b5f63ffeee8e89a89d3f2e6a9af06efc4
Author: Aljoscha Krettek 
Date:   2017-09-27T11:02:57Z

[FLINK-5619] Consolidate AggregatingState Tests in StateBackendTestBase




> State merging in RocksDB backend leaves old state
> -
>
> Key: FLINK-7700
> URL: https://issues.apache.org/jira/browse/FLINK-7700
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> {{RocksDBAggregatingState.mergeNamespaces(...)}}, 
> {{RocksDBReducingState.mergeNamespaces(...)}}, and 
> {{RocksDBListState.mergeNamespaces(...)}} don't remove state from the old 
> location when merging. This means that merged state will never be cleaned up 
> and will accumulate.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter

2017-09-29 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/3511
  
> IMHO, in addition to these changes, there are still some potential 
improvements we can do about the sorter, like deserialization when comparing 
the real records.

Do you mean `NormalizedKeySorter.compareRecords`? This calls 
`comparator.compareSerialized`, which is currently implemented for most types 
by simply deserializing the records and then comparing the deserialized form. 
It would maybe bring some performance improvement if this had a real 
implementation, which would deserialize only the key fields (which are relevant 
to the comparison). The hard part here is how to handle nullable fields, which 
make the offset of individual fields unpredictable.

So I think a simpler and better approach is to just make sure that most 
types have a good implementation of `putNormalizedKey`, and then 
`NormalizedKeySorter.compareRecords` would be called only rarely, so its 
performance wouldn't really matter.

> We are working on a project to fully code generate the algorithm Flink 
runtime used, and borrowed lots of idea of this PR, thanks!

I'm happy to hear that!

Btw. I think a large potential in code-generation is to eliminate the 
overheads of the very many virtual function calls throughout the runtime, by 
making the calls monomorphic [1,2]. For example, there could be a custom 
`MapDriver` generated for each map UDF, and then this custom class would always 
call the same UDF, making the call monomorphic, and thus easily optimizable by 
the JVM [1,2]. (I think @greghogan was also thinking about this.)

For example, the following virtual calls are on the per-record code path:
- drivers (such as `MapDriver`) calling `MutableObjectIterator.next`
- drivers calling the UDF
- drivers calling `output.collect`
- `ReusingDeserializationDelegate` or `NonReusingDeserializationDelegate` 
calling `serializer.deserialize`
- `SpillingResettableMutableObjectIterator` calling `serializer.serialize` 
and `serializer.deserialize`
- `PojoSerializer`, `TupleSerializer`, `RowSerializer`, etc. calling their 
field serializers
- `CountingCollector` calling `collector.collect`
- `RecordWriter` calling `channelSelector.selectChannels`
- `SerializationDelegate` calling `serializer.serialize`
- `StreamElementSerializer` calling methods on its `typeSerializer`
- `OutputEmitter` calling `comparator.hash`
- `ComparableKeySelector` calling `comparator.extractKeys`
- `assignToKeyGroup` calling `key.hashCode`

I think most of the above calls are megamorphic (especially in larger Flink 
jobs with many operators), which makes them slow [1,2]. They could be made 
monomorphic by code-generating custom versions of these classes, where the 
customization would be to fix the type of the targets of these calls. (I think 
this could probably be done independently of the Table API.)

Another potential for code-generation is customizing `OutputEmitter` for 
the number of channels: there are places where a modulo operation is performed, 
which is much faster if the divisor is a compile-time constant, since then the 
compiler uses such tricks as described in this paper:
https://gmplib.org/~tege/divcnst-pldi94.pdf
And the same optimization could be made in `KeyGroupRangeAssignment`, where 
there are divisions by `maxParallelism`.

> we need more type information control and flexible code generating 
supports, so we choose to do it through the Table API & SQL. How do you see 
this approach?

Having more info about the types, UDFs, etc. of your program certainly can 
help. Unfortunately I don't know too much about the Table API & SQL, but I have 
a few random thoughts:
- Since you are generating the UDFs, it should be possible to make them 
have good object reuse behavior, without the users having to worry about the 
non-trivial object reuse rules.
- For basic types (int, boolean, etc.) the generated code could use 
`IntValue`, `BooleanValue`, etc., facilitating more object reuse, again without 
making the users write ugly boilerplate (as they have to do in the 
DataSet/DataStream API when they want to use these classes).
- Same thing with `StringValue`

Btw. have you seen this PR for code generation for POJO serializers and 
comparators? https://github.com/apache/flink/pull/2211
It has some issues, so it is not as close to merging as this PR, but maybe 
I'll try to tie that up as well some time.

[1] 
http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
[2] https://shipilev.net/blog/2015/black-magic-method-dispatch/


---


[GitHub] flink pull request #4752: [FLINK-7700] Fix RocksDB state merging on release-...

2017-09-29 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4752

[FLINK-7700] Fix RocksDB state merging on release-1.3

This is only for running the tests, which I can't do on my own Travis since 
tests time out.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink release-1.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4752.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4752


commit 64858a7881d22fb665099e1b2cd8363e447ed572
Author: Aljoscha Krettek 
Date:   2016-11-04T09:17:55Z

[FLINK-5619] Add numStateEntries() method for all keyed backends

This also adds a test for this in StateBackendTestBase

commit 31cd6db6a7d2f7d78aa05f6ff5fc82abbc79d042
Author: Aljoscha Krettek 
Date:   2017-09-27T09:37:09Z

[FLINK-7700] Fix RocksDB ListState merging

Before, the merged state was not cleared.

commit 5e3d49cc05591bf396bfcf9e578f8e30ce436114
Author: Aljoscha Krettek 
Date:   2017-09-27T09:38:13Z

[FLINK-5619] Consolidate ListState Tests in StateBackendTestBase

commit e0bd8c060da01e5defe528964bf638868cbc13d5
Author: Aljoscha Krettek 
Date:   2017-09-27T10:53:08Z

[FLINK-7700] Fix RocksDB ReducingState merging

Before, the merged state was not cleared.

commit 0ffca863a4b189675d2f15ee61f9d20fe49e85f0
Author: Aljoscha Krettek 
Date:   2017-09-27T10:54:19Z

[FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase

commit b0a7c091856aa2a99715a9fddcc801f660414cfd
Author: Aljoscha Krettek 
Date:   2017-09-27T11:01:55Z

[FLINK-7700] Fix RocksDB AggregatingState merging

Before, the merged state was not cleared.

commit 835de46b5f63ffeee8e89a89d3f2e6a9af06efc4
Author: Aljoscha Krettek 
Date:   2017-09-27T11:02:57Z

[FLINK-5619] Consolidate AggregatingState Tests in StateBackendTestBase




---


[jira] [Commented] (FLINK-7736) Fix some of the alerts raised by lgtm.com

2017-09-29 Thread Malcolm Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185921#comment-16185921
 ] 

Malcolm Taylor commented on FLINK-7736:
---

Identified 14 alerts to address:
1) dereferenced variable is always null, in TaskSlotTable
2-3) array index out of bounds, in KVStateRequestSerializer and Utils
4) inconsistent equals and hashCode, in ArchivedJson
5-6) close input, in JarListHandler and SocketTextStreamFunction
7) close output, in JarFileCreator
8) unused format argument, in YarnApplicationMasterRunner
9) useless type test, in GroupReduceNode
10-11) useless comparison, in TaskExecutor and FieldAccessor
12-14) Result of integer multiplication cast to long, in MemoryManager and 
twice in InPlaceMutableHashTable

> Fix some of the alerts raised by lgtm.com
> -
>
> Key: FLINK-7736
> URL: https://issues.apache.org/jira/browse/FLINK-7736
> Project: Flink
>  Issue Type: Improvement
>Reporter: Malcolm Taylor
>Assignee: Malcolm Taylor
>
> lgtm.com has identified a number of issues giving scope for improvement in 
> the code: [https://lgtm.com/projects/g/apache/flink/alerts/?mode=list]
> This issue is to address some of the simpler ones. Some of these are quite 
> clear bugs such as off-by-one errors. Others are areas where the code might 
> be made clearer, such as use of a variable name which shadows another 
> variable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4750: [FLINK-7710] [flip6] Add CheckpointStatisticsHandl...

2017-09-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4750#discussion_r141882415
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
 ---
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response of the {@link CheckpointStatisticsHandler}.
+ */
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_COUNTS = "counts";
+
+   public static final String FIELD_NAME_SUMMARY = "summary";
+
+   public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
+
+   public static final String FIELD_NAME_HISTORY = "history";
+
+   @JsonProperty(FIELD_NAME_COUNTS)
+   private final Counts counts;
+
+   @JsonProperty(FIELD_NAME_SUMMARY)
+   private final Summary summary;
+
+   @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
+   private final LatestCheckpoints latestCheckpoints;
+
+   @JsonProperty(FIELD_NAME_HISTORY)
+   private final List history;
+
+   @JsonCreator
+   public CheckpointStatistics(
+   @JsonProperty(FIELD_NAME_COUNTS) Counts counts,
+   @JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
+   @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) 
LatestCheckpoints latestCheckpoints,
+   @JsonProperty(FIELD_NAME_HISTORY) 
List history) {
+   this.counts = Preconditions.checkNotNull(counts);
+   this.summary = Preconditions.checkNotNull(summary);
+   this.latestCheckpoints = 
Preconditions.checkNotNull(latestCheckpoints);
+   this.history = Preconditions.checkNotNull(history);
+   }
+
+   public Counts getCounts() {
+   return counts;
+   }
+
+   public Summary getSummary() {
+   return summary;
+   }
+
+   public LatestCheckpoints getLatestCheckpoints() {
+   return latestCheckpoints;
+   }
+
+   public List getHistory() {
+   return history;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   CheckpointStatistics that = (CheckpointStatistics) o;
+   return Objects.equals(counts, that.counts) &&
+   Objects.equals(summary, that.summary) &&
+   Objects.equals(latestCheckpoints, 
that.latestCheckpoints) &&
+   Objects.equals(history, that.history);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(counts, summary, latestCheckpoints, 
history);
+   }
+
+   // --
+   // Inner classes
+   // --
+
+   /**
+* Checkpoint counts.
+*/
+   public static final class Counts {
+
+   public static final String FIELD_NAME_RESTORED_CHECKPOINTS = 
"restored";
+
+   public static final String FIELD_NAME_TOTAL_CHECKPOINTS = 
"total";
+
+   public static final String 

[jira] [Commented] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185911#comment-16185911
 ] 

ASF GitHub Bot commented on FLINK-7710:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4750#discussion_r141882415
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
 ---
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response of the {@link CheckpointStatisticsHandler}.
+ */
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_COUNTS = "counts";
+
+   public static final String FIELD_NAME_SUMMARY = "summary";
+
+   public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
+
+   public static final String FIELD_NAME_HISTORY = "history";
+
+   @JsonProperty(FIELD_NAME_COUNTS)
+   private final Counts counts;
+
+   @JsonProperty(FIELD_NAME_SUMMARY)
+   private final Summary summary;
+
+   @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
+   private final LatestCheckpoints latestCheckpoints;
+
+   @JsonProperty(FIELD_NAME_HISTORY)
+   private final List history;
+
+   @JsonCreator
+   public CheckpointStatistics(
+   @JsonProperty(FIELD_NAME_COUNTS) Counts counts,
+   @JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
+   @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) 
LatestCheckpoints latestCheckpoints,
+   @JsonProperty(FIELD_NAME_HISTORY) 
List history) {
+   this.counts = Preconditions.checkNotNull(counts);
+   this.summary = Preconditions.checkNotNull(summary);
+   this.latestCheckpoints = 
Preconditions.checkNotNull(latestCheckpoints);
+   this.history = Preconditions.checkNotNull(history);
+   }
+
+   public Counts getCounts() {
+   return counts;
+   }
+
+   public Summary getSummary() {
+   return summary;
+   }
+
+   public LatestCheckpoints getLatestCheckpoints() {
+   return latestCheckpoints;
+   }
+
+   public List getHistory() {
+   return history;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   CheckpointStatistics that = (CheckpointStatistics) o;
+   return Objects.equals(counts, that.counts) &&
+   Objects.equals(summary, that.summary) &&
+   Objects.equals(latestCheckpoints, 
that.latestCheckpoints) &&
+   Objects.equals(history, that.history);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(counts, summary, latestCheckpoints, 
history);
+   }
+
+   // --
+   // Inner classes
+   // --
+
+   /**
+* Checkpoint counts.
+*/
+   public static final class Counts {
+
+  

[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185839#comment-16185839
 ] 

ASF GitHub Bot commented on FLINK-7739:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4751

[FLINK-7739][kafka-tests] Throttle down data producing thread

Minor tests improvement in tests to avoid busy loop


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink custom-partitioning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4751.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4751


commit 77cefb87214a4b7462c5b65fafe86352fe3a7047
Author: Piotr Nowojski 
Date:   2017-09-28T14:59:09Z

[FLINK-7739][kafka-tests] Throttle down data producing thread




> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4751: [FLINK-7739][kafka-tests] Throttle down data produ...

2017-09-29 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4751

[FLINK-7739][kafka-tests] Throttle down data producing thread

Minor tests improvement in tests to avoid busy loop


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink custom-partitioning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4751.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4751


commit 77cefb87214a4b7462c5b65fafe86352fe3a7047
Author: Piotr Nowojski 
Date:   2017-09-28T14:59:09Z

[FLINK-7739][kafka-tests] Throttle down data producing thread




---


[GitHub] flink pull request #4750: [FLINK-7710] [flip6] Add CheckpointStatisticsHandl...

2017-09-29 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4750

[FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST 
endpoint

## What is the purpose of the change

This commit also makes the CheckpointStatsHistory object serializable by 
removing the
CheckpointStatsHistoryIterable and replacing it with a static ArrayList.

## Verifying this change

*(Please pick either of the following options)*

This change added tests and can be verified as follows:

- `CheckpointStatisticsTest` which tests the marshalling of 
`CheckpointStatistics` instances

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink portCheckpointStatsHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4750


commit d6be27aac5d7ff022dc03874a8bb52b1c09d49de
Author: Till Rohrmann 
Date:   2017-09-25T13:29:59Z

[FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers

The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the 
latter, the former
does not expect the AccessExecutionGraph to be the true ExecutionGraph. 
Instead it assumes
it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache 
entries after
a given time to live period. This will trigger requesting the 
AccessExecutionGraph again
and, thus, updating the ExecutionGraph information for the ExecutionGraph 
based REST
handlers.

In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic 
cleanup task
which triggers ExecutionGraphCache.cleanup. This methods releases all cache 
entries which
have exceeded their time to live. Currently it is set to 20 * 
refreshInterval of the
web gui.

This closes #4728.

commit 29f5d7e2484d52889230c559031cb7c427173927
Author: Till Rohrmann 
Date:   2017-09-26T16:39:15Z

[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint

commit e77fb7a5d78be540116051b9914cd70e61cf5df7
Author: Till Rohrmann 
Date:   2017-09-28T16:35:50Z

[FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint

This commit implements the CheckpointConfigHandler which now returns a
CheckpointConfigInfo object if checkpointing is enabled. In case that
checkpointing is disabled for a job, it will return a 404 response.

commit cd32570761c129da0bfff1fe0a4b4af6ae193c9c
Author: Till Rohrmann 
Date:   2017-09-29T13:09:06Z

[FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST 
endpoint

This commit also makes the CheckpointStatsHistory object serializable by 
removing the
CheckpointStatsHistoryIterable and replacing it with a static ArrayList.




---


[jira] [Commented] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185821#comment-16185821
 ] 

ASF GitHub Bot commented on FLINK-7710:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4750

[FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST 
endpoint

## What is the purpose of the change

This commit also makes the CheckpointStatsHistory object serializable by 
removing the
CheckpointStatsHistoryIterable and replacing it with a static ArrayList.

## Verifying this change

*(Please pick either of the following options)*

This change added tests and can be verified as follows:

- `CheckpointStatisticsTest` which tests the marshalling of 
`CheckpointStatistics` instances

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink portCheckpointStatsHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4750


commit d6be27aac5d7ff022dc03874a8bb52b1c09d49de
Author: Till Rohrmann 
Date:   2017-09-25T13:29:59Z

[FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers

The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the 
latter, the former
does not expect the AccessExecutionGraph to be the true ExecutionGraph. 
Instead it assumes
it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache 
entries after
a given time to live period. This will trigger requesting the 
AccessExecutionGraph again
and, thus, updating the ExecutionGraph information for the ExecutionGraph 
based REST
handlers.

In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic 
cleanup task
which triggers ExecutionGraphCache.cleanup. This methods releases all cache 
entries which
have exceeded their time to live. Currently it is set to 20 * 
refreshInterval of the
web gui.

This closes #4728.

commit 29f5d7e2484d52889230c559031cb7c427173927
Author: Till Rohrmann 
Date:   2017-09-26T16:39:15Z

[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint

commit e77fb7a5d78be540116051b9914cd70e61cf5df7
Author: Till Rohrmann 
Date:   2017-09-28T16:35:50Z

[FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint

This commit implements the CheckpointConfigHandler which now returns a
CheckpointConfigInfo object if checkpointing is enabled. In case that
checkpointing is disabled for a job, it will return a 404 response.

commit cd32570761c129da0bfff1fe0a4b4af6ae193c9c
Author: Till Rohrmann 
Date:   2017-09-29T13:09:06Z

[FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST 
endpoint

This commit also makes the CheckpointStatsHistory object serializable by 
removing the
CheckpointStatsHistoryIterable and replacing it with a static ArrayList.




> Port CheckpointStatsHandler to new REST endpoint
> 
>
> Key: FLINK-7710
> URL: https://issues.apache.org/jira/browse/FLINK-7710
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185816#comment-16185816
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4730


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4730: [hotfix] [REST] Various rest-related hotfixes

2017-09-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4730


---


[GitHub] flink issue #4730: [hotfix] [REST] Various rest-related hotfixes

2017-09-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4730
  
Changes look good to me. Thanks a lot for your contribution @zentol. 
Merging this PR.


---


[GitHub] flink pull request #4749: [FLINK-7739][tests] Properly shutdown resources in...

2017-09-29 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4749

[FLINK-7739][tests] Properly shutdown resources in tests

This is a fixup of tests, without touching the production code.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink kafka-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4749


commit 4e55087d091993a9552b5f732abab8a5c0a5a014
Author: Piotr Nowojski 
Date:   2017-09-29T12:45:28Z

[FLINK-7739][kafka-tests] Shutdown NetworkFailureProxy

commit a5bbc6fdf2a3e039739a0392dad3d61791f0c3fd
Author: Piotr Nowojski 
Date:   2017-09-29T12:50:37Z

[FLINK-7739][tests] Properly close flink mini cluster




---


[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185796#comment-16185796
 ] 

ASF GitHub Bot commented on FLINK-7739:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4749

[FLINK-7739][tests] Properly shutdown resources in tests

This is a fixup of tests, without touching the production code.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink kafka-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4749


commit 4e55087d091993a9552b5f732abab8a5c0a5a014
Author: Piotr Nowojski 
Date:   2017-09-29T12:45:28Z

[FLINK-7739][kafka-tests] Shutdown NetworkFailureProxy

commit a5bbc6fdf2a3e039739a0392dad3d61791f0c3fd
Author: Piotr Nowojski 
Date:   2017-09-29T12:50:37Z

[FLINK-7739][tests] Properly close flink mini cluster




> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-09-29 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7739:
-

 Summary: Improve Kafka*ITCase tests stability
 Key: FLINK-7739
 URL: https://issues.apache.org/jira/browse/FLINK-7739
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.3.2
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185685#comment-16185685
 ] 

ASF GitHub Bot commented on FLINK-7668:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4728
  
+1 from my side too.


> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Once we support offline {{AccessExecutionGraph}} implementation (see 
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} 
> after which the {{AccessExecutionGraph}} is retrieved again from the 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4728: [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph b...

2017-09-29 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4728
  
+1 from my side too.


---


[GitHub] flink pull request #4748: [hotfix][tests] Use G1GC for tests

2017-09-29 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4748

[hotfix][tests] Use G1GC for tests

We are using G1GC for running TaskManager, I think we should do it so also 
for tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink g1gc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4748.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4748


commit e5aff977d8ee1efac4fb4c12c57431568618a7e1
Author: Piotr Nowojski 
Date:   2017-09-27T16:38:42Z

[hotfix][tests] Use G1GC for tests




---


[jira] [Commented] (FLINK-7708) Port CheckpointConfigHandler to new REST endpoint

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185663#comment-16185663
 ] 

ASF GitHub Bot commented on FLINK-7708:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4744#discussion_r141836633
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response class of the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_PROCESSING_MODE = "mode";
+
+   public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
+
+   public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
+
+   public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = 
"min_pause";
+
+   public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = 
"max_concurrent";
+
+   public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = 
"externalization";
+
--- End diff --

remove empty line (should fail checkstyle)


> Port CheckpointConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7708
> URL: https://issues.apache.org/jira/browse/FLINK-7708
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointConfigHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4744: [FLINK-7708] [flip6] Add CheckpointConfigHandler f...

2017-09-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4744#discussion_r141836633
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response class of the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_PROCESSING_MODE = "mode";
+
+   public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
+
+   public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
+
+   public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = 
"min_pause";
+
+   public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = 
"max_concurrent";
+
+   public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = 
"externalization";
+
--- End diff --

remove empty line (should fail checkstyle)


---


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185592#comment-16185592
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4625
  
Thanks for the update @xccui. I'll have a look in the next days.


> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185591#comment-16185591
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r141831721
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 

[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...

2017-09-29 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4625
  
Thanks for the update @xccui. I'll have a look in the next days.


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r141831721
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,

[GitHub] flink pull request #4747: [FLINK-7728] [DataStream] Flush StatusWatermarkVal...

2017-09-29 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4747

[FLINK-7728] [DataStream] Flush StatusWatermarkValve once all inputs become 
idle 

## What is the purpose of the change

This PR is based on #4738. Only the last three commits are relevant.

Prior to this PR, once all inputs of the `StatusWatermarkValve` becomes 
idle, we only emit the `StreamStatus.IDLE marker`, and check nothing else. This 
makes the watermark advancement behaviour inconsistent in the case that all 
inputs become idle, depending on the order that they become idle.

This PR fixes this by "flushing" the max watermark across all channels once 
all inputs become idle. At a high-level, what this means for downstream 
operators is that all inputs have become idle and will temporarily cease to 
advance their watermarks, so they can safely advance their event time to 
whatever the largest watermark is.

This PR also includes changes to `StatusWatermarkValveTest` to cover the 
above mentioned case, as well as reworking the unit tests to be more 
fine-grained with small enough test case scope, so that we have a better 
overview of what cases are covered.

## Brief change log

- d11d98d preliminary cleanup of 
`StatusWatermarkValveTest#BufferedValveOutputHandler`. Previous implementation 
was overly complex and could not provide test behaviors that we required.
- 0e386da main change for this PR. Includes a new test in 
`StatusWatermarkValve` to cover the missing case. That test does not pass 
without this change.
-  b977c18 refactor unit tests to be more fine-grained scoped.


## Verifying this change

`StatusWatermarkValveTest`, `OneInputStreamTaskTest`, 
`TwoInputStreamTaskTest` should be able to verify this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): **YES**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-7728

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4747.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4747


commit a9ce90aaee99374513941270b804de2f5564c47e
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-27T18:05:21Z

[FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated 
from watermark-aligned inputs

Prior to this commit, In the calculation of the new min watermark in
StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(),
there is no verification that the calculated new min watermark
really is aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned
but actually some are active, we would then incorrectly determine that
the final aggregation is Long.MAX_VALUE and emit that.

This commit fixes this by only emitting the aggregated watermark iff it was
really calculated from some aligned input channel (as well as the
already existing constraint that it needs to be larger than the last
emitted watermark). This change should also safely cover the case that a
Long.MAX_VALUE was genuinely aggregated from the input channels.

commit d11d98dda4723760e7d9fb3b9680a1e9daca3705
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-28T12:11:22Z

[FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in 
StatusWatermarkValveTest

The previous implementation was overly complicated. Having separate
buffers for the StreamStatus and Watermarks is not required for our
tests. Also, that design doesn't allow checking the order StreamStatus /
Watermarks are emitted from a single input to the valve.

This commit reworks it by buffering both StreamStatus and Watermarks in
a shared queue.

commit 0e386dab2fc19df78276ce203ed8f38792145759
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-28T14:49:22Z

[FLINK-7728] [DataStream] Flush max watermark across all inputs once all 
become idle

Prior to this commit, once all inputs of the StatusWatermarkValve
becomes idle, we only emit the StreamStatus.IDLE marker, and check
nothing else. This makes the watermark advancement 

[jira] [Commented] (FLINK-7728) StatusWatermarkValve has different min watermark advancement behavior depending on the ordering inputs become idle

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185574#comment-16185574
 ] 

ASF GitHub Bot commented on FLINK-7728:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4747

[FLINK-7728] [DataStream] Flush StatusWatermarkValve once all inputs become 
idle 

## What is the purpose of the change

This PR is based on #4738. Only the last three commits are relevant.

Prior to this PR, once all inputs of the `StatusWatermarkValve` becomes 
idle, we only emit the `StreamStatus.IDLE marker`, and check nothing else. This 
makes the watermark advancement behaviour inconsistent in the case that all 
inputs become idle, depending on the order that they become idle.

This PR fixes this by "flushing" the max watermark across all channels once 
all inputs become idle. At a high-level, what this means for downstream 
operators is that all inputs have become idle and will temporarily cease to 
advance their watermarks, so they can safely advance their event time to 
whatever the largest watermark is.

This PR also includes changes to `StatusWatermarkValveTest` to cover the 
above mentioned case, as well as reworking the unit tests to be more 
fine-grained with small enough test case scope, so that we have a better 
overview of what cases are covered.

## Brief change log

- d11d98d preliminary cleanup of 
`StatusWatermarkValveTest#BufferedValveOutputHandler`. Previous implementation 
was overly complex and could not provide test behaviors that we required.
- 0e386da main change for this PR. Includes a new test in 
`StatusWatermarkValve` to cover the missing case. That test does not pass 
without this change.
-  b977c18 refactor unit tests to be more fine-grained scoped.


## Verifying this change

`StatusWatermarkValveTest`, `OneInputStreamTaskTest`, 
`TwoInputStreamTaskTest` should be able to verify this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): **YES**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-7728

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4747.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4747


commit a9ce90aaee99374513941270b804de2f5564c47e
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-27T18:05:21Z

[FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated 
from watermark-aligned inputs

Prior to this commit, In the calculation of the new min watermark in
StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(),
there is no verification that the calculated new min watermark
really is aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned
but actually some are active, we would then incorrectly determine that
the final aggregation is Long.MAX_VALUE and emit that.

This commit fixes this by only emitting the aggregated watermark iff it was
really calculated from some aligned input channel (as well as the
already existing constraint that it needs to be larger than the last
emitted watermark). This change should also safely cover the case that a
Long.MAX_VALUE was genuinely aggregated from the input channels.

commit d11d98dda4723760e7d9fb3b9680a1e9daca3705
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-28T12:11:22Z

[FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in 
StatusWatermarkValveTest

The previous implementation was overly complicated. Having separate
buffers for the StreamStatus and Watermarks is not required for our
tests. Also, that design doesn't allow checking the order StreamStatus /
Watermarks are emitted from a single input to the valve.

This commit reworks it by buffering both StreamStatus and Watermarks in
a shared queue.

commit 0e386dab2fc19df78276ce203ed8f38792145759
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-28T14:49:22Z

[FLINK-7728] [DataStream] Flush max watermark 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-09-29 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r141822004
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
--- End diff --

Do you mean something like MesosWorkerStore.Worker in MesosResourceManager, 
which has both the taskId (used as resourceID) and other context info for the 
Mesos worker?


---


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185544#comment-16185544
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r141822004
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
--- End diff --

Do you mean something like MesosWorkerStore.Worker in MesosResourceManager, 
which has both the taskId (used as resourceID) and other context info for the 
Mesos worker?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185532#comment-16185532
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r141820967
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   ContainerId containerId = 
containerIdMap.remove(resourceID.getResourceIdString());
+   if (containerId != null) {
+   log.info("Stopping container {}.", 
containerId.toString());
+   
resourceManagerClient.releaseAssignedContainer(containerId);
--- End diff --

The ContainerId for YARN can be only generated using 
ContainerId.newInstance, which requires the application attempt id and the 
container Id. The resourceID only contains the container Id information, so 
it's not sufficient to rebuild the YARN ContainerId object that the 
releaseAssignedContainer method takes.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-09-29 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r141820967
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   ContainerId containerId = 
containerIdMap.remove(resourceID.getResourceIdString());
+   if (containerId != null) {
+   log.info("Stopping container {}.", 
containerId.toString());
+   
resourceManagerClient.releaseAssignedContainer(containerId);
--- End diff --

The ContainerId for YARN can be only generated using 
ContainerId.newInstance, which requires the application attempt id and the 
container Id. The resourceID only contains the container Id information, so 
it's not sufficient to rebuild the YARN ContainerId object that the 
releaseAssignedContainer method takes.


---


[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185519#comment-16185519
 ] 

ASF GitHub Bot commented on FLINK-7668:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4728
  
Thanks for addressing / replying to my comments.
LGTM, +1 from me


> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Once we support offline {{AccessExecutionGraph}} implementation (see 
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} 
> after which the {{AccessExecutionGraph}} is retrieved again from the 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4728: [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph b...

2017-09-29 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4728
  
Thanks for addressing / replying to my comments.
LGTM, +1 from me


---


[GitHub] flink pull request #4728: [FLINK-7668] Add ExecutionGraphCache for Execution...

2017-09-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4728#discussion_r141817520
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.JobNotFoundException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link ExecutionGraphCache}.
+ */
+public class ExecutionGraphCacheTest extends TestLogger {
+
+   /**
+* Tests that we can cache AccessExecutionGraphs over multiple accesses.
+*/
+   @Test
+   public void testExecutionGraphCaching() throws Exception {
+   final Time timeout = Time.milliseconds(100L);
+   final Time timeToLive = Time.hours(1L);
+   final JobID jobId = new JobID();
+   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
+
+   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
+   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+
+   try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
+   CompletableFuture 
accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+
+   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture.get());
+
+   CompletableFuture 
accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+
+   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture2.get());
+
+   // verify that we only issued a single request to the 
gateway
+   verify(jobManagerGateway, 
times(1)).requestJob(eq(jobId), any(Time.class));
+   }
+   }
+
+   /**
+* Tests that an AccessExecutionGraph is invalidated after its TTL 
expired.
+*/
+   @Test
+   public void testExecutionGraphEntryInvalidation() throws Exception {
+   final Time timeout = Time.milliseconds(100L);
+   final Time timeToLive = 

[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185518#comment-16185518
 ] 

ASF GitHub Bot commented on FLINK-7668:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4728#discussion_r141817520
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.JobNotFoundException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link ExecutionGraphCache}.
+ */
+public class ExecutionGraphCacheTest extends TestLogger {
+
+   /**
+* Tests that we can cache AccessExecutionGraphs over multiple accesses.
+*/
+   @Test
+   public void testExecutionGraphCaching() throws Exception {
+   final Time timeout = Time.milliseconds(100L);
+   final Time timeToLive = Time.hours(1L);
+   final JobID jobId = new JobID();
+   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
+
+   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
+   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+
+   try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
+   CompletableFuture 
accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+
+   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture.get());
+
+   CompletableFuture 
accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+
+   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture2.get());
+
+   // verify that we only issued a single request to the 
gateway
+   verify(jobManagerGateway, 
times(1)).requestJob(eq(jobId), any(Time.class));
+   }
+   }
+
+   /**
+* Tests that an AccessExecutionGraph is invalidated after its 

[jira] [Assigned] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint

2017-09-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7710:


Assignee: Till Rohrmann

> Port CheckpointStatsHandler to new REST endpoint
> 
>
> Key: FLINK-7710
> URL: https://issues.apache.org/jira/browse/FLINK-7710
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185507#comment-16185507
 ] 

ASF GitHub Bot commented on FLINK-7668:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4728
  
I've rebased the code onto the latest master. Are all PR comments resolved 
@zentol and @tzulitai?


> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Once we support offline {{AccessExecutionGraph}} implementation (see 
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} 
> after which the {{AccessExecutionGraph}} is retrieved again from the 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4728: [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph b...

2017-09-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4728
  
I've rebased the code onto the latest master. Are all PR comments resolved 
@zentol and @tzulitai?


---


[jira] [Commented] (FLINK-7667) Add serializable AccessExecutionGraph implementation

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185506#comment-16185506
 ] 

ASF GitHub Bot commented on FLINK-7667:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4727


> Add serializable AccessExecutionGraph implementation
> 
>
> Key: FLINK-7667
> URL: https://issues.apache.org/jira/browse/FLINK-7667
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to decouple the REST endpoint from the {{JobMaster}} we should have 
> an "offline" implementation of the {{AccessExecutionGraph}} which is 
> serializable and can be set to remote peers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4727: [FLINK-7667] [flip6] Use ArchivedExecutionGraph as...

2017-09-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4727


---


[jira] [Closed] (FLINK-7667) Add serializable AccessExecutionGraph implementation

2017-09-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7667.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 2dd557fad4a0a205a3e163fa918507d34c933c6a

> Add serializable AccessExecutionGraph implementation
> 
>
> Key: FLINK-7667
> URL: https://issues.apache.org/jira/browse/FLINK-7667
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to decouple the REST endpoint from the {{JobMaster}} we should have 
> an "offline" implementation of the {{AccessExecutionGraph}} which is 
> serializable and can be set to remote peers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7667) Add serializable AccessExecutionGraph implementation

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185503#comment-16185503
 ] 

ASF GitHub Bot commented on FLINK-7667:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4727
  
Merging this PR.


> Add serializable AccessExecutionGraph implementation
> 
>
> Key: FLINK-7667
> URL: https://issues.apache.org/jira/browse/FLINK-7667
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to decouple the REST endpoint from the {{JobMaster}} we should have 
> an "offline" implementation of the {{AccessExecutionGraph}} which is 
> serializable and can be set to remote peers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4727: [FLINK-7667] [flip6] Use ArchivedExecutionGraph as serial...

2017-09-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4727
  
Merging this PR.


---


[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint

2017-09-29 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185502#comment-16185502
 ] 

Till Rohrmann commented on FLINK-7648:
--

In order to wait on the completion you can do

{code}
@Test
public void testStandaloneSessionCluster() throws ExecutionException, 
InterruptedException {
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.ADDRESS, "localhost");

StandaloneSessionClusterEntrypoint standaloneSessionClusterEntrypoint = 
new StandaloneSessionClusterEntrypoint(configuration);

standaloneSessionClusterEntrypoint.startCluster();

standaloneSessionClusterEntrypoint.getTerminationFuture().get();
}
{code}

> Port TaskManagersHandler to new REST endpoint
> -
>
> Key: FLINK-7648
> URL: https://issues.apache.org/jira/browse/FLINK-7648
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{TaskManagersHandler}} to the new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7541) Redistribute operator state using OperatorID

2017-09-29 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-7541.
-

There are still parts of the code in the Flink that Relays on operator order

> Redistribute operator state using OperatorID
> 
>
> Key: FLINK-7541
> URL: https://issues.apache.org/jira/browse/FLINK-7541
> Project: Flink
>  Issue Type: Improvement
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently StateAssignmentOperation relays heavily on the order of new and old 
> operators in the task. It should be changed and it should relay more on 
> OperatorID.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7541) Redistribute operator state using OperatorID

2017-09-29 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski resolved FLINK-7541.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

> Redistribute operator state using OperatorID
> 
>
> Key: FLINK-7541
> URL: https://issues.apache.org/jira/browse/FLINK-7541
> Project: Flink
>  Issue Type: Improvement
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently StateAssignmentOperation relays heavily on the order of new and old 
> operators in the task. It should be changed and it should relay more on 
> OperatorID.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-09-29 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185498#comment-16185498
 ] 

Till Rohrmann commented on FLINK-7694:
--

Yes, this will make sure that the web gui will also understand the response 
from the new {{JobMetricsOverviewHandler}}.

> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-29 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-7683.
-

> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7695) Port JobConfigHandler to new REST endpoint

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185497#comment-16185497
 ] 

ASF GitHub Bot commented on FLINK-7695:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4737
  
Thanks for the review @zentol and @yew1eb. I have to throw this PR once 
more on Travis to see if everything passes. Once this is done and the depended 
PRs are merged, I'll merge this one as well.


> Port JobConfigHandler to new REST endpoint
> --
>
> Key: FLINK-7695
> URL: https://issues.apache.org/jira/browse/FLINK-7695
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the {{JobConfigHandler}} to the new {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-29 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski resolved FLINK-7683.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4737: [FLINK-7695] [flip6] Add JobConfigHandler for new RestSer...

2017-09-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4737
  
Thanks for the review @zentol and @yew1eb. I have to throw this PR once 
more on Travis to see if everything passes. Once this is done and the depended 
PRs are merged, I'll merge this one as well.


---


[jira] [Commented] (FLINK-7704) Port JobPlanHandler to new REST endpoint

2017-09-29 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185493#comment-16185493
 ] 

Till Rohrmann commented on FLINK-7704:
--

Hi [~haizhou], great to hear that you want to help with Flip-6 :-) All help is 
highly welcome! Go ahead and do the port of the {{JobPlanHandler}}.

> Port JobPlanHandler to new REST endpoint
> 
>
> Key: FLINK-7704
> URL: https://issues.apache.org/jira/browse/FLINK-7704
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hai Zhou UTC+8
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobPlanHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7695) Port JobConfigHandler to new REST endpoint

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185473#comment-16185473
 ] 

ASF GitHub Bot commented on FLINK-7695:
---

Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4737
  
Hey @tillrohrmann, Will you merge this PR ?
I want to fix sibling issues (FLINK-7704, FLINK-7705, FLINK-7706).
:beers:

Best,
Hai Zhou


> Port JobConfigHandler to new REST endpoint
> --
>
> Key: FLINK-7695
> URL: https://issues.apache.org/jira/browse/FLINK-7695
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the {{JobConfigHandler}} to the new {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4737: [FLINK-7695] [flip6] Add JobConfigHandler for new RestSer...

2017-09-29 Thread yew1eb
Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4737
  
Hey @tillrohrmann, Will you merge this PR ?
I want to fix sibling issues (FLINK-7704, FLINK-7705, FLINK-7706).
:beers:

Best,
Hai Zhou


---


  1   2   >