[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5059 ---
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152856201 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ +
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152854733 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ +
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152854163 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ +
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152853859 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ +
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152823141 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ +
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152822070 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ +
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152826230 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ +
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152823934 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ +
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5059 [FLINK-8125] [core] Introduce limiting of file system connections ## What is the purpose of the change This change introduces a way to limit the number of streams that Flink FileSystems concurrently open. For example, for very small HDFS clusters with few RPC handlers, a large Flink job trying to build up many connections during a checkpoint causes failures due to rejected connections. For example, by adding setting like `fs.hdfs.limit.total: 128`, this would limit the total number of connections per TaskManager to the filesystem with scheme "hdfs" to 128. ## Brief change log - Adds the `LimitedConnectionsFileSystem` which can wrap existing file systems and track and limit the number of input- and output streams - Adds code to `FileSystem` to wrap the file system factories in `ConnectionLimitingFactory` which wraps the file system in a `LimitedConnectionsFileSystem` if connection limiting is configured. - Adds documentation for Flink's file system support in general, including the here added configuration settings ## Verifying this change - This change is covered by various unit tests: - `LimitedConnectionsFileSystemTest` - `LimitedConnectionsFileSystemDelegationTest` - `LimitedConnectionsConfigurationTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no)** - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink fs_connections Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5059.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5059 commit 81b1df9fd82b4e5d7feeed257afac72525b89aa9 Author: Stephan EwenDate: 2017-11-08T19:14:34Z [hotfix] [core] Fix lots of checkstyle errors in core.fs commit c17d00654fcfeb3ed5729810bb2a9b3ba127ddb9 Author: Stephan Ewen Date: 2017-11-08T22:57:04Z [FLINK-8125] [core] Introduce limiting of outgoing file system connections ---