[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-24 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5059


---


[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152856201
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+   

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152854733
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+   

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152854163
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+   

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152853859
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+   

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152823141
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+  

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152822070
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+  

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152826230
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+  

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5059#discussion_r152823934
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and 
connections
+ * would fail in that case. This happens, for example, for very small HDFS 
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many 
connections during
+ * a checkpoint.
+ *
+ * The filesystem may track the progress of streams and close streams 
that have been
+ * inactive for too long, to avoid locked streams of taking up the 
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
+ * periodically check the currently open streams once the limit of open 
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+   /** The original file system to which connections are limited. */
+   private final FileSystem originalFs;
+
+   /** The lock that synchronizes connection bookkeeping. */
+   private final ReentrantLock lock;
+
+   /** Condition for threads that are blocking on the availability of new 
connections. */
+   private final Condition available;
+
+   /** The maximum number of concurrently open output streams. */
+   private final int maxNumOpenOutputStreams;
+
+   /** The maximum number of concurrently open input streams. */
+   private final int maxNumOpenInputStreams;
+
+   /** The maximum number of concurrently open streams (input + output). */
+   private final int maxNumOpenStreamsTotal;
+
+   /** The nanoseconds that a opening a stream may wait for availability. 
*/
+   private final long streamOpenTimeoutNanos;
+
+   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
+   private final long streamInactivityTimeoutNanos;
+
+   /** The set of currently open output streams. */
+   @GuardedBy("lock")
+   private final HashSet openOutputStreams;
+
+   /** The set of currently open input streams. */
+   @GuardedBy("lock")
+   private final HashSet openInputStreams;
+
+   /** The number of output streams reserved to be opened. */
+   @GuardedBy("lock")
+   private int numReservedOutputStreams;
+
+   /** The number of input streams reserved to be opened. */
+  

[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-23 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5059

[FLINK-8125] [core] Introduce limiting of file system connections

## What is the purpose of the change

This change introduces a way to limit the number of streams that Flink 
FileSystems concurrently open.
For example, for very small HDFS clusters with few RPC handlers, a large 
Flink job trying to build up many connections during a checkpoint causes 
failures due to rejected connections.

For example, by adding setting like `fs.hdfs.limit.total: 128`, this would 
limit the total number of connections per TaskManager to the filesystem with 
scheme "hdfs" to 128.

## Brief change log

  - Adds the `LimitedConnectionsFileSystem` which can wrap existing file 
systems and track and limit the number of input- and output streams
  - Adds code to `FileSystem` to wrap the file system factories in 
`ConnectionLimitingFactory` which wraps the file system in a 
`LimitedConnectionsFileSystem` if connection limiting is configured.
  - Adds documentation for Flink's file system support in general, 
including the here added configuration settings


## Verifying this change

  - This change is covered by various unit tests:
 - `LimitedConnectionsFileSystemTest`
 - `LimitedConnectionsFileSystemDelegationTest`
 - `LimitedConnectionsConfigurationTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no)**
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink fs_connections

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5059.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5059


commit 81b1df9fd82b4e5d7feeed257afac72525b89aa9
Author: Stephan Ewen 
Date:   2017-11-08T19:14:34Z

[hotfix] [core] Fix lots of checkstyle errors in core.fs

commit c17d00654fcfeb3ed5729810bb2a9b3ba127ddb9
Author: Stephan Ewen 
Date:   2017-11-08T22:57:04Z

[FLINK-8125] [core] Introduce limiting of outgoing file system connections




---