[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

2017-08-11 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r132655177
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FixedBufferPool.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A buffer pool used to manage a number of {@link Buffer} instances from 
the
+ * {@link NetworkBufferPool}.
+ *
+ *  Compared with {@link LocalBufferPool}, this is a fixed size 
(non-rebalancing) buffer pool
--- End diff --

As per our checkstyle (not enforced here yet), you should remove the space 
after the ``. There are some more places in this PR but I'll stop marking 
them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

2017-08-11 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r132655478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolListener.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+/**
+ * Interface for the interaction with {@link BufferPool}. The buffer pool 
listener is used
+ * to be notified of availability of buffer more than once.
--- End diff --

`of the availability of buffers. Listeners can opt for a one-time only 
notification or to be notified repeatedly.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

2017-08-11 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13267
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
 ---
@@ -39,6 +39,17 @@
BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) 
throws IOException;
 
/**
+* Tries to create a buffer pool, which is guaranteed to provide the 
fixed number of required
+* buffers.
+*
+*  The buffer pool is of fixed size with 
numRequiredBuffers buffers.
--- End diff --

please remove the space after `` (checkstyle)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

2017-08-11 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r132670989
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FixedBufferPool.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A buffer pool used to manage a number of {@link Buffer} instances from 
the
+ * {@link NetworkBufferPool}.
+ *
+ *  Compared with {@link LocalBufferPool}, this is a fixed size 
(non-rebalancing) buffer pool
+ * type which will not be involved in distributing the left available 
buffers in network buffer pool.
+ *
+ *  This buffer pool can be used to manage the floating buffers for 
input gate.
+ */
+class FixedBufferPool implements BufferPool {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FixedBufferPool.class);
+
+   /** Global network buffer pool to get buffers from. */
+   private final NetworkBufferPool networkBufferPool;
+
+   /** The required number of segments for this pool. */
+   private final int numberOfRequiredMemorySegments;
+
+   /**
+* The currently available memory segments. These are segments, which 
have been requested from
+* the network buffer pool and are currently not handed out as Buffer 
instances.
+*/
+   private final Queue availableMemorySegments = new 
ArrayDeque<>();
+
+   /**
+* Buffer availability listeners, which need to be notified when a 
Buffer becomes available.
+* Listeners can only be registered at a time/state where no Buffer 
instance was available.
+*/
+   private final Queue registeredListeners = new 
ArrayDeque<>();
+
+   /**
+* Number of all memory segments, which have been requested from the 
network buffer pool and are
+* somehow referenced through this pool (e.g. wrapped in Buffer 
instances or as available segments).
+*/
+   private int numberOfRequestedMemorySegments;
+
+   private boolean isDestroyed;
+
+   private BufferPoolOwner owner;
+
+   /**
+* Fixed buffer pool based on the given networkBufferPool with 
a number of
+* network buffers being available.
+*
+* @param networkBufferPool
+*  global network buffer pool to get buffers from
+* @param numberOfRequiredMemorySegments
+*  fixed number of network buffers
+*/
+   FixedBufferPool(NetworkBufferPool networkBufferPool, int 
numberOfRequiredMemorySegments) {
+   checkArgument(numberOfRequiredMemorySegments >= 0,
+   "Required number of memory segments (%s) should not be 
less than 0.",
+   numberOfRequiredMemorySegments);
+
+   LOG.debug("Using a fixed buffer pool with {} buffers", 
numberOfRequiredMemorySegments);
+
+   this.networkBufferPool = networkBufferPool;
+   this.numberOfRequiredMemorySegments = 
numberOfRequiredMemorySegments;
+   }
+
+   // 

+   // Properties
+   // 

+
+   @Override
+   public boolean isDestroyed() {
+   synchronized (availableMemorySegments) {
+   return isDestroyed;
+   }
+   }
+
+   @Override
+   public int getMemorySegmentSize() {
   

[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

2017-08-11 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r132655594
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
 ---
@@ -79,4 +79,12 @@
 * Returns the number of used buffers of this buffer pool.
 */
int bestEffortGetNumOfUsedBuffers();
+
+   /**
+* Adds a buffer availability listener to this buffer pool.
+*
+*  The operation fails with return value false, when 
there is a buffer available or
--- End diff --

please remove the space after  (checkstyle)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

2017-08-11 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r132673916
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -221,13 +228,44 @@ public BufferPool createBufferPool(int 
numRequiredBuffers, int maxUsedBuffers) t
}
 
@Override
+   public BufferPool createFixedBufferPool(int numRequiredBuffers) throws 
IOException {
+   synchronized (factoryLock) {
+   if (isDestroyed) {
+   throw new IllegalStateException("Network buffer 
pool has already been destroyed.");
+   }
+
+   // Ensure that the number of required buffers can be 
satisfied.
+   if (numTotalRequiredBuffers + numRequiredBuffers > 
totalNumberOfMemorySegments) {
+   throw new 
IOException(String.format("Insufficient number of network buffers: " +
--- End diff --

why not keep the whole message as in `#createBufferPool`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

2017-08-07 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/4485

[FLINK-7378][core]Implement the FixedBufferPool for floating buffers of 
SingleInputGate

## What is the purpose of the change

Currently the number of network buffers in `LocalBufferPool` for 
`SingleInputGate` is limited by `a *  + b`, where a is the 
number of exclusive buffers for each channel and b is the number of floating 
buffers shared by all channels.

Considering the credit-based flow control feature, we want to implement a 
new fixed size buffer pool type used to manage the floating buffers for 
`SingleInputGate`.

Compared with `LocalBufferPool`, this is a non-rebalancing buffer pool 
which will not participate in redistributing the left available buffers in 
`NetworkBufferPool`.

## Brief change log

  - *Implemented a new fixed size buffer pool type*
  - *Added a `BufferPoolListener` interface for notifying buffer 
availability more than once, this can replace the current `EventListener` in 
`BufferProvider` interface later*
  - *Distinguished the fixed buffer pools and redistributed buffer pools in 
`NetworkBufferPool`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added `FixedBufferPoolTest` for verifying this new buffer pool 
behaviors*
  - *Modified the existing `NetworkBufferPoolTest` and 
`BufferPoolFactoryTest` for verifying creating fixed buffer pool and 
redistribute the available buffers in `NetworkBufferPool`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-7378

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4485


commit 5c7a27f6fdd215150174c7827cc87b5ea08e01bc
Author: Zhijiang 
Date:   2017-08-07T09:31:17Z

[FLINK-7378][core]Implement the FixedBufferPool for floating buffers of 
SingleInputGate




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---