This is an automated email from the ASF dual-hosted git repository. echobravo pushed a commit to branch release/1.12.0 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.12.0 by this push: new 37a18a0 GEODE-7832: Remove Connection Semaphores (#4754) 37a18a0 is described below commit 37a18a0b921c65645700efa202d388ecd0a04ccb Author: Juan José Ramos <jujora...@users.noreply.github.com> AuthorDate: Wed Mar 4 09:19:20 2020 +0000 GEODE-7832: Remove Connection Semaphores (#4754) Removed the semaphores and related methods from DirectChannel and Connection classes. They were used to constrain messaging when some undocumented system properties were set. (cherry picked from commit 2a3e09f2da4793d08d1124b5d5e656285295937d) --- .../internal/ClusterOperationExecutors.java | 8 - .../distributed/internal/direct/DirectChannel.java | 200 +++++---------------- .../apache/geode/internal/cache/properties.html | 34 ---- .../org/apache/geode/internal/tcp/Connection.java | 81 +-------- 4 files changed, 49 insertions(+), 274 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java index ddd2aff..d16e7fd 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java @@ -38,7 +38,6 @@ import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.monitoring.ThreadsMonitoring; import org.apache.geode.internal.monitoring.ThreadsMonitoringImpl; import org.apache.geode.internal.monitoring.ThreadsMonitoringImplDummy; -import org.apache.geode.internal.tcp.Connection; import org.apache.geode.internal.tcp.ConnectionTable; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -375,7 +374,6 @@ public class ClusterOperationExecutors implements OperationExecutors { FunctionExecutionPooledExecutor.setIsFunctionExecutionThread(Boolean.TRUE); try { ConnectionTable.threadWantsSharedResources(); - Connection.makeReaderThread(); runUntilShutdown(command); } finally { ConnectionTable.releaseThreadsSockets(); @@ -388,7 +386,6 @@ public class ClusterOperationExecutors implements OperationExecutors { stats.incNumProcessingThreads(1); try { ConnectionTable.threadWantsSharedResources(); - Connection.makeReaderThread(); runUntilShutdown(command); } finally { ConnectionTable.releaseThreadsSockets(); @@ -400,7 +397,6 @@ public class ClusterOperationExecutors implements OperationExecutors { stats.incHighPriorityThreads(1); try { ConnectionTable.threadWantsSharedResources(); - Connection.makeReaderThread(); runUntilShutdown(command); } finally { ConnectionTable.releaseThreadsSockets(); @@ -412,7 +408,6 @@ public class ClusterOperationExecutors implements OperationExecutors { stats.incWaitingThreads(1); try { ConnectionTable.threadWantsSharedResources(); - Connection.makeReaderThread(); runUntilShutdown(command); } finally { ConnectionTable.releaseThreadsSockets(); @@ -424,7 +419,6 @@ public class ClusterOperationExecutors implements OperationExecutors { stats.incPartitionedRegionThreads(1); try { ConnectionTable.threadWantsSharedResources(); - Connection.makeReaderThread(); runUntilShutdown(command); } finally { ConnectionTable.releaseThreadsSockets(); @@ -436,7 +430,6 @@ public class ClusterOperationExecutors implements OperationExecutors { stats.incNumSerialThreads(1); try { ConnectionTable.threadWantsSharedResources(); - Connection.makeReaderThread(); runUntilShutdown(command); } finally { ConnectionTable.releaseThreadsSockets(); @@ -816,7 +809,6 @@ public class ClusterOperationExecutors implements OperationExecutors { private void doSerialPooledThread(Runnable command) { ConnectionTable.threadWantsSharedResources(); - Connection.makeReaderThread(); try { command.run(); } finally { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java index 10b0108..4799270 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.Semaphore; import org.apache.logging.log4j.Logger; @@ -36,7 +35,6 @@ import org.apache.geode.SystemFailure; import org.apache.geode.alerting.internal.spi.AlertingAction; import org.apache.geode.cache.TimeoutException; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.DistributedSystemDisconnectedException; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DirectReplyProcessor; @@ -58,7 +56,6 @@ import org.apache.geode.internal.tcp.ConnectionException; import org.apache.geode.internal.tcp.MsgStreamer; import org.apache.geode.internal.tcp.TCPConduit; import org.apache.geode.internal.util.Breadcrumbs; -import org.apache.geode.internal.util.concurrent.ReentrantSemaphore; import org.apache.geode.logging.internal.log4j.api.LogService; /** @@ -98,8 +95,6 @@ public class DirectChannel { if (disconnected) { disconnected = false; disconnectCompleted = false; - this.groupOrderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS); - this.groupUnorderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS); } } @@ -145,8 +140,6 @@ public class DirectChannel { this.conduit = new TCPConduit(mgr, port, address, isBindAddress, this, props); disconnected = false; disconnectCompleted = false; - this.groupOrderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS); - this.groupUnorderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS); logger.info("GemFire P2P Listener started on {}", conduit.getSocketId()); @@ -158,65 +151,6 @@ public class DirectChannel { } } - - /** - * Return how many concurrent operations should be allowed by default. since 6.6, this has been - * raised to Integer.MAX value from the number of available processors. Setting this to a lower - * value raises the possibility of a deadlock when serializing a message with PDX objects, because - * the PDX serialization can trigger further distribution. - */ - public static final int DEFAULT_CONCURRENCY_LEVEL = - Integer.getInteger("p2p.defaultConcurrencyLevel", Integer.MAX_VALUE / 2).intValue(); - - /** - * The maximum number of concurrent senders sending a message to a group of recipients. - */ - private static final int MAX_GROUP_SENDERS = - Integer.getInteger("p2p.maxGroupSenders", DEFAULT_CONCURRENCY_LEVEL).intValue(); - private Semaphore groupUnorderedSenderSem; - private Semaphore groupOrderedSenderSem; - - private Semaphore getGroupSem(boolean ordered) { - if (ordered) { - return this.groupOrderedSenderSem; - } else { - return this.groupUnorderedSenderSem; - } - } - - private void acquireGroupSendPermission(boolean ordered) { - if (this.disconnected) { - throw new org.apache.geode.distributed.DistributedSystemDisconnectedException( - "Direct channel has been stopped"); - } - // @todo darrel: add some stats - final Semaphore s = getGroupSem(ordered); - for (;;) { - this.conduit.getCancelCriterion().checkCancelInProgress(null); - boolean interrupted = Thread.interrupted(); - try { - s.acquire(); - break; - } catch (InterruptedException ex) { - interrupted = true; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } // for - if (this.disconnected) { - s.release(); - throw new DistributedSystemDisconnectedException( - "communications disconnected"); - } - } - - private void releaseGroupSendPermission(boolean ordered) { - final Semaphore s = getGroupSem(ordered); - s.release(); - } - /** * Returns true if calling thread owns its own communication resources. */ @@ -352,95 +286,67 @@ public class DirectChannel { return bytesWritten; } - boolean sendingToGroup = cons.size() > 1; - Connection permissionCon = null; - if (sendingToGroup) { - acquireGroupSendPermission(orderedMsg); - } else { - // sending over just one connection - permissionCon = (Connection) cons.get(0); - if (permissionCon != null) { - try { - permissionCon.acquireSendPermission(); - } catch (ConnectionException conEx) { - // Set retryInfo and then retry. - // We want to keep calling TCPConduit.getConnection until it doesn't - // return a connection. - retryInfo = new ConnectExceptions(); - retryInfo.addFailure(permissionCon.getRemoteAddress(), conEx); - continue; - } - } + if (retry && logger.isDebugEnabled()) { + logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip", + msg, cons.size(), cons); } + DMStats stats = getDMStats(); + List<?> sentCons; // used for cons we sent to this time + final BaseMsgStreamer ms = + MsgStreamer.create(cons, msg, directReply, stats, getConduit().getBufferPool()); try { - if (retry && logger.isDebugEnabled()) { - logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip", - msg, cons.size(), cons); + startTime = 0; + if (ackTimeout > 0) { + startTime = System.currentTimeMillis(); } - DMStats stats = getDMStats(); - List<?> sentCons; // used for cons we sent to this time + ms.reserveConnections(startTime, ackTimeout, ackSDTimeout); + + int result = ms.writeMessage(); + if (bytesWritten == 0) { + // bytesWritten only needs to be set once. + // if we have to do a retry we don't want to count + // each one's bytes. + bytesWritten = result; + } + ce = ms.getConnectExceptions(); + sentCons = ms.getSentConnections(); - final BaseMsgStreamer ms = - MsgStreamer.create(cons, msg, directReply, stats, getConduit().getBufferPool()); + totalSentCons.addAll(sentCons); + } catch (NotSerializableException e) { + throw e; + } catch (IOException ex) { + throw new InternalGemFireException( + "Unknown error serializing message", + ex); + } finally { try { - startTime = 0; - if (ackTimeout > 0) { - startTime = System.currentTimeMillis(); - } - ms.reserveConnections(startTime, ackTimeout, ackSDTimeout); - - int result = ms.writeMessage(); - if (bytesWritten == 0) { - // bytesWritten only needs to be set once. - // if we have to do a retry we don't want to count - // each one's bytes. - bytesWritten = result; - } - ce = ms.getConnectExceptions(); - sentCons = ms.getSentConnections(); - - totalSentCons.addAll(sentCons); - } catch (NotSerializableException e) { - throw e; - } catch (IOException ex) { - throw new InternalGemFireException( - "Unknown error serializing message", - ex); - } finally { - try { - ms.close(); - } catch (IOException e) { - throw new InternalGemFireException("Unknown error serializing message", e); - } + ms.close(); + } catch (IOException e) { + throw new InternalGemFireException("Unknown error serializing message", e); } + } - if (ce != null) { - retryInfo = ce; - ce = null; - } + if (ce != null) { + retryInfo = ce; + ce = null; + } - if (directReply && !sentCons.isEmpty()) { - long readAckStart = 0; + if (directReply && !sentCons.isEmpty()) { + long readAckStart = 0; + if (stats != null) { + readAckStart = stats.startReplyWait(); + } + try { + ce = readAcks(sentCons, startTime, ackTimeout, ackSDTimeout, ce, + directMsg.getDirectReplyProcessor()); + } finally { if (stats != null) { - readAckStart = stats.startReplyWait(); - } - try { - ce = readAcks(sentCons, startTime, ackTimeout, ackSDTimeout, ce, - directMsg.getDirectReplyProcessor()); - } finally { - if (stats != null) { - stats.endReplyWait(readAckStart, startTime); - } + stats.endReplyWait(readAckStart, startTime); } } - } finally { - if (sendingToGroup) { - releaseGroupSendPermission(orderedMsg); - } else if (permissionCon != null) { - permissionCon.releaseSendPermission(); - } } + if (ce != null) { if (retryInfo != null) { retryInfo.getMembers().addAll(ce.getMembers()); @@ -734,16 +640,6 @@ public class DirectChannel { public synchronized void disconnect(Exception cause) { this.disconnected = true; this.disconnectCompleted = false; - try { - groupOrderedSenderSem.release(); - } catch (Error e) { - // GEODE-1076 - already released - } - try { - groupUnorderedSenderSem.release(); - } catch (Error e) { - // GEODE-1076 - already released - } this.conduit.stop(cause); this.disconnectCompleted = true; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html index 5e3dbcf..1f5b88d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html @@ -2657,23 +2657,6 @@ TBA </dd> <!-- ------------------------------------------------------- --> -<dt><strong>p2p.defaultConcurrencyLevel</strong></dt> -<dd> -<em>Public:</em> false -<p> -<em>Integer</em> (default is the number of processors on current machine, -but no less than 2) -<p> -See <code>org.apache.geode.distributed.internal.direct.DirectChannel#DEFAULT_CONCURRENCY_LEVEL</code>. -<p> -<pre> - Return how many concurrent operations should be allowed by default. -</pre> -<p> -TBA -</dd> - -<!-- ------------------------------------------------------- --> <dt><strong>p2p.defaultLogLevel</strong></dt> <dd> <em>Public:</em> false @@ -2787,23 +2770,6 @@ TBA </dd> <!-- ------------------------------------------------------- --> -<dt><strong>p2p.maxConnectionSenders</strong></dt> -<dd> -<em>Public:</em> false -<p> -<em>Integer</em> (default is p2p.defaultConcurrencyLevel) -<p> -See <code>org.apache.geode.internal.tcp.Connection#MAX_SENDERS</code>. -<p> -<pre> - The maximum number of concurrent senders sending a message to a single - recipient. -</pre> -<p> -TBA -</dd> - -<!-- ------------------------------------------------------- --> <dt><strong>p2p.maxGroupSenders</strong></dt> <dd> <em>Public:</em> false diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index e91b1fa..d266688 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -38,7 +38,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -70,7 +69,6 @@ import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.ReplyMessage; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.ReplySender; -import org.apache.geode.distributed.internal.direct.DirectChannel; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.distributed.internal.membership.api.MemberShunnedException; import org.apache.geode.distributed.internal.membership.api.Membership; @@ -85,7 +83,6 @@ import org.apache.geode.internal.net.NioPlainEngine; import org.apache.geode.internal.net.SocketCreator; import org.apache.geode.internal.serialization.Version; import org.apache.geode.internal.tcp.MsgReader.Header; -import org.apache.geode.internal.util.concurrent.ReentrantSemaphore; import org.apache.geode.logging.internal.executors.LoggingThread; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -159,8 +156,6 @@ public class Connection implements Runnable { */ private SystemTimerTask idleTask; - private static final ThreadLocal<Boolean> isReaderThread = withInitial(() -> FALSE); - /** * If true then readers for thread owned sockets will send all messages on thread owned senders. * Even normally unordered msgs get send on TO socks. @@ -255,18 +250,6 @@ public class Connection implements Runnable { /** used for async writes */ private Thread pusherThread; - /** - * The maximum number of concurrent senders sending a message to a single recipient. - */ - private static final int MAX_SENDERS = Integer - .getInteger("p2p.maxConnectionSenders", DirectChannel.DEFAULT_CONCURRENCY_LEVEL); - /** - * This semaphore is used to throttle how many threads will try to do sends on this connection - * concurrently. A thread must acquire this semaphore before it is allowed to start serializing - * its message. - */ - private final Semaphore senderSem = new ReentrantSemaphore(MAX_SENDERS); - /** Set to true once the handshake has been read */ private volatile boolean handshakeRead; private volatile boolean handshakeCancelled; @@ -515,20 +498,6 @@ public class Connection implements Runnable { return sharedResource; } - public static void makeReaderThread() { - // mark this thread as a reader thread - makeReaderThread(true); - } - - private static void makeReaderThread(boolean v) { - isReaderThread.set(v); - } - - // return true if this thread is a reader thread - private static boolean isReaderThread() { - return isReaderThread.get(); - } - @VisibleForTesting int getP2PConnectTimeout(DistributionConfig config) { if (AlertingAction.isThreadAlerting()) { @@ -1336,7 +1305,6 @@ public class Connection implements Runnable { } } connected = false; - closeSenderSem(); final DMStats stats = owner.getConduit().getStats(); if (finishedConnecting) { @@ -1467,7 +1435,7 @@ public class Connection implements Runnable { readerThread = Thread.currentThread(); readerThread.setName(p2pReaderName()); ConnectionTable.threadWantsSharedResources(); - makeReaderThread(isReceiver); + try { readMessages(); } finally { @@ -3274,53 +3242,6 @@ public class Connection implements Runnable { return messagesSent; } - public void acquireSendPermission() throws ConnectionException { - if (!connected) { - throw new ConnectionException("connection is closed"); - } - if (isReaderThread()) { - // reader threads send replies and we always want to permit those without waiting - return; - } - boolean interrupted = false; - try { - for (;;) { - owner.getConduit().getCancelCriterion().checkCancelInProgress(null); - try { - senderSem.acquire(); - break; - } catch (InterruptedException ex) { - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - if (!connected) { - senderSem.release(); - owner.getConduit().getCancelCriterion().checkCancelInProgress(null); - throw new ConnectionException("connection is closed"); - } - } - - public void releaseSendPermission() { - if (isReaderThread()) { - return; - } - senderSem.release(); - } - - private void closeSenderSem() { - // All we need to do is increase the number of permits by one - // just in case 1 or more connections are currently waiting to acquire. - // One of them will get the permit, then find out the connection is closed - // and release the permit until all the connections currently waiting to acquire - // will complete by throwing a ConnectionException. - releaseSendPermission(); - } - private class BatchBufferFlusher extends Thread { private volatile boolean flushNeeded;