This is an automated email from the ASF dual-hosted git repository. mhanson pushed a commit to branch bugfix/StatusReporter in repository https://gitbox.apache.org/repos/asf/geode.git
commit 47a9bcf3be7d7f0d70ebeb97b485a7b0ac868bed Author: Mark Hanson <hans...@vmware.com> AuthorDate: Thu Jan 28 15:31:14 2021 -0800 Adding GatewaySender logging. --- .../java/org/apache/geode/ComponentStatus.java | 9 + .../main/java/org/apache/geode/StatusReporter.java | 99 ++++++++++ .../apache/geode/distributed/ServerLauncher.java | 31 ++- .../geode/distributed/internal/ReplyMessage.java | 4 + .../distributed/internal/ReplyProcessor21.java | 21 +- .../internal/cache/DistributedCacheOperation.java | 82 ++++++-- .../geode/internal/cache/GemFireCacheImpl.java | 90 +++------ .../internal/cache/wan/AbstractGatewaySender.java | 43 ++-- .../wan/AbstractGatewaySenderEventProcessor.java | 62 ++---- .../internal/cache/wan/GatewaySenderAdvisor.java | 89 +++++---- .../wan/GatewaySenderEventCallbackDispatcher.java | 8 +- ...aySenderQueueEntrySynchronizationOperation.java | 34 ++-- ...currentParallelGatewaySenderEventProcessor.java | 40 ++-- .../ParallelGatewaySenderEventProcessor.java | 20 +- .../wan/parallel/ParallelGatewaySenderQueue.java | 216 +++++++-------------- ...tilParallelGatewaySenderFlushedCoordinator.java | 36 ++-- ...oncurrentSerialGatewaySenderEventProcessor.java | 52 ++--- .../serial/SerialGatewaySenderEventProcessor.java | 95 ++++----- .../cache/wan/serial/SerialGatewaySenderQueue.java | 86 +++----- .../wan/serial/SerialSecondaryGatewayListener.java | 6 +- .../java/org/apache/geode/StatusReporterTest.java | 105 ++++++++++ .../cache/wan/serial/SerialGatewaySenderImpl.java | 27 ++- 22 files changed, 687 insertions(+), 568 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/ComponentStatus.java b/geode-core/src/main/java/org/apache/geode/ComponentStatus.java new file mode 100644 index 0000000..6018d0a --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/ComponentStatus.java @@ -0,0 +1,9 @@ +package org.apache.geode; + +public interface ComponentStatus { + String name(); + + String getStatusString(); + + void print(); +} diff --git a/geode-core/src/main/java/org/apache/geode/StatusReporter.java b/geode-core/src/main/java/org/apache/geode/StatusReporter.java new file mode 100644 index 0000000..4a437fd --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/StatusReporter.java @@ -0,0 +1,99 @@ +package org.apache.geode; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class StatusReporter { + protected static Logger logger = LogService.getLogger(); + + public static boolean isComponentRegistered(ComponentStatus componentStatus) { + synchronized (componentStatusSet) { + return componentStatusSet.contains(componentStatus); + } + } + + enum State { + SR_RUNNING, SR_NOT_STARTED + } + + private static final Set<ComponentStatus> componentStatusSet = new HashSet<>(); + private static State status = State.SR_NOT_STARTED; + private static final Semaphore stopSem = new Semaphore(1); + + static final Thread statusReporterThread = new Thread(() -> { + status = State.SR_RUNNING; + logger.info("StatusReporter Started thread."); + try { + stopSem.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + while (true) { + try { + if (stopSem.tryAcquire(5000, TimeUnit.MILLISECONDS)) { + break; + } + StatusReporter.getStatusReport(); + } catch (Exception e) { + return; + } + } + logger.info("StatusReporter Stopping thread."); + status = State.SR_NOT_STARTED; + }); + + public static State getStatus() { + State tempState; + synchronized (statusReporterThread) { + if(status != State.SR_RUNNING) { + StatusReporter.start(); + } + tempState = status; + } + return tempState; + } + + public static void start() { + synchronized (statusReporterThread) { + statusReporterThread.start(); + } + } + + public static void stop() throws InterruptedException { + synchronized (statusReporterThread) { + stopSem.release(); + } + statusReporterThread.join(); + } + + public static boolean registerComponent(ComponentStatus componentStatus) { + synchronized (componentStatusSet) { + return componentStatusSet.add(componentStatus); + } + } + + public static boolean removeComponent(ComponentStatus componentStatus) { + synchronized (componentStatusSet) { + return componentStatusSet.remove(componentStatus); + } + } + + public static boolean getStatusReport() { + synchronized (statusReporterThread) { + if (componentStatusSet.isEmpty()) { + return false; + } + for (ComponentStatus componentStatus : componentStatusSet) { + componentStatus.print(); + } + } + return true; + } + +} diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java index f10b053..b9320c1 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java @@ -37,10 +37,12 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; @@ -60,6 +62,8 @@ import joptsimple.OptionSet; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.logging.log4j.Logger; +import org.apache.geode.ComponentStatus; +import org.apache.geode.StatusReporter; import org.apache.geode.SystemFailure; import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.VisibleForTesting; @@ -114,7 +118,7 @@ import org.apache.geode.util.internal.GeodeGlossary; * @since GemFire 7.0 */ @SuppressWarnings("unused") -public class ServerLauncher extends AbstractLauncher<String> { +public class ServerLauncher extends AbstractLauncher<String> implements ComponentStatus { private static final Logger log = LogService.getLogger(); @@ -195,7 +199,7 @@ public class ServerLauncher extends AbstractLauncher<String> { private final ControlNotificationHandler controlHandler; private final AtomicBoolean starting = new AtomicBoolean(false); - + private final Set<ComponentStatus> subComponentStatus = new HashSet<>(); private final boolean assignBuckets; private final boolean deletePidFileOnStop; private final boolean disableDefaultServer; @@ -832,7 +836,8 @@ public class ServerLauncher extends AbstractLauncher<String> { } awaitStartupTasks(cache, startTime); - + StatusReporter.start(); + StatusReporter.registerComponent(this); debug("Running Server on (%1$s) in (%2$s) as (%3$s)...", getId(), getWorkingDirectory(), getMember()); running.set(true); @@ -1369,6 +1374,26 @@ public class ServerLauncher extends AbstractLauncher<String> { ProcessType.SERVER, isForcing()); } + @Override + public String name() { + return getId(); + } + + @Override + public String getStatusString() { + return isRunning() ? "running" : "not running"; + } + + + + @Override + public void print() { + logger.info(" Server Name: " + name() + " Component Status: " + getStatusString()); + for (ComponentStatus componentStatus : subComponentStatus) { + componentStatus.print(); + } + } + private class ServerControllerParameters implements ProcessControllerParameters { @Override public File getPidFile() { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyMessage.java index 2f59608..6d387a1 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyMessage.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyMessage.java @@ -334,6 +334,10 @@ public class ReplyMessage extends HighPriorityDistributionMessage { sb.append(ex); } } + sb.append(" ignored ? "); + sb.append(this.ignored); + sb.append(" closed ? "); + sb.append(this.closed); return sb; } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java index 83d7884..20421b3 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java @@ -409,29 +409,38 @@ public class ReplyProcessor21 implements MembershipListener { } protected void process(DistributionMessage msg, boolean warn) { - if (logger.isDebugEnabled()) { - logger.debug("{} got process({}) from {}", this, msg, msg.getSender()); - } + logger.info("MLH {} got process({}) from {}", this, msg, msg.getSender()); + logger.info("MLH Process 1 msg {}", msg); if (msg instanceof ReplyMessage) { ReplyException ex = ((ReplyMessage) msg).getException(); + logger.info("MLH Process 2 msg {} with exception of {}", msg, ex); + if (ex != null) { if (ex.getCause() instanceof DSFIDNotFoundException) { processException(msg, (DSFIDNotFoundException) ex.getCause()); } else { processException(msg, ex); } + } else { + ReplyMessage replyMessage = (ReplyMessage) msg; + logger.info( + "MLH Process 3 msg {} ignored ? {} was closed ? {} ",replyMessage, + replyMessage.ignored, replyMessage.closed); } } + logger.info("MLH Process 4"); + final InternalDistributedMember sender = msg.getSender(); if (!removeMember(sender, false) && warn) { + logger.info("MLH Process 5 "); // if the member hasn't left the system, something is wrong final DistributionManager dm = getDistributionManager(); // fix for bug 33253 Set ids = getDistributionManagerIds(); if (ids == null || ids.contains(sender)) { List viewMembers = dm.getViewMembers(); if (system.getConfig().getMcastPort() == 0 // could be using multicast & will get responses - // from everyone + // from everyone && (viewMembers == null || viewMembers.contains(sender))) { logger.warn( "Received reply from member {} but was not expecting one. More than one reply may have been received. The reply that was not expected is: {}", @@ -439,6 +448,7 @@ public class ReplyProcessor21 implements MembershipListener { } } } + logger.info("MLH Process 6 "); checkIfDone(); } @@ -761,7 +771,7 @@ public class ReplyProcessor21 implements MembershipListener { * @param p_msecs the number of milliseconds to wait for replies, zero will be interpreted as * Long.MAX_VALUE * - * @throws ReplyException an exception passed back in reply + * @throws ReplyException an exception passed back in reply * * @throws InternalGemFireException if ack-threshold was exceeded and system property * "ack-threshold-exception" is set to true @@ -964,6 +974,7 @@ public class ReplyProcessor21 implements MembershipListener { @Override public String toString() { + return "<" + shortName() + " " + this.getProcessorId() + " waiting for " + numMembers() + " replies" + (exception == null ? "" : (" exception: " + exception)) + " from " + membersToString() + ">"; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 3e55045..69658c7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -331,6 +331,7 @@ public abstract class DistributedCacheOperation { DistributedRegion region = getRegion(); DistributionManager mgr = region.getDistributionManager(); boolean reliableOp = isOperationReliable() && region.requiresReliabilityCheck(); + boolean isDebugEnabled = logger.isDebugEnabled(); if (SLOW_DISTRIBUTION_MS > 0) { // test hook try { @@ -365,7 +366,7 @@ public abstract class DistributedCacheOperation { routingComputed = true; filterRouting = getRecipientFilterRouting(recipients); if (filterRouting != null) { - if (logger.isDebugEnabled()) { + if (isDebugEnabled) { logger.debug("Computed this filter routing: {}", filterRouting); } } @@ -472,7 +473,7 @@ public abstract class DistributedCacheOperation { } } - if (logger.isDebugEnabled()) { + if (isDebugEnabled) { logger.debug("recipients for {}: {} with adjunct messages to: {}", this, recipients, adjunctRecipients); } @@ -485,10 +486,20 @@ public abstract class DistributedCacheOperation { // common // case waitForMembers = recipients; + if (isDebugEnabled) { + logger.debug("MLH _distribute 1 this : " + this + " recipients " + recipients); + } + } else if (!cachelessNodes.isEmpty()) { waitForMembers = new HashSet(recipients); waitForMembers.addAll(cachelessNodes); + if (isDebugEnabled) { + logger.debug("MLH _distribute 2 this : " + this + " recipients " + recipients); + } } else { + if (isDebugEnabled) { + logger.debug("MLH _distribute 3 this : " + this + " recipients " + recipients); + } // note that we use a Vector instead of a Set for the responders // collection // because partitioned regions sometimes send both a regular cache @@ -504,7 +515,7 @@ public abstract class DistributedCacheOperation { LOSS_SIMULATION_GENERATOR = new Random(this.hashCode()); } if ((LOSS_SIMULATION_GENERATOR.nextInt(100) * 1.0 / 100.0) < LOSS_SIMULATION_RATIO) { - if (logger.isDebugEnabled()) { + if (isDebugEnabled) { logger.debug("loss simulation is inhibiting message transmission to {}", recipients); } @@ -513,6 +524,9 @@ public abstract class DistributedCacheOperation { } } if (reliableOp) { + if (isDebugEnabled) { + logger.debug("MLH _distribute 4 reliableOp : " + reliableOp); + } this.departedMembers = new HashSet(); this.processor = new ReliableCacheReplyProcessor(region.getSystem(), waitForMembers, this.departedMembers); @@ -520,7 +534,9 @@ public abstract class DistributedCacheOperation { this.processor = new CacheOperationReplyProcessor(region.getSystem(), waitForMembers); } } - + if (isDebugEnabled) { + logger.debug("MLH _distribute 5 "); + } Set failures = null; CacheOperationMessage msg = createMessage(); initMessage(msg, this.processor); @@ -528,7 +544,9 @@ public abstract class DistributedCacheOperation { if (DistributedCacheOperation.internalBeforePutOutgoing != null) { DistributedCacheOperation.internalBeforePutOutgoing.run(); } - + if (isDebugEnabled) { + logger.debug("MLH _distribute 6 "); + } if (processor != null && msg.isSevereAlertCompatible()) { this.processor.enableSevereAlertProcessing(); // if this message is distributing for a partitioned region message, @@ -545,7 +563,9 @@ public abstract class DistributedCacheOperation { } } } - + if (isDebugEnabled) { + logger.debug("MLH _distribute 7 "); + } msg.setMulticast(useMulticast); msg.directAck = directAck; if (region.isUsedForPartitionedRegionBucket()) { @@ -559,7 +579,9 @@ public abstract class DistributedCacheOperation { } else if (!routingComputed) { msg.needsRouting = true; } - + if (isDebugEnabled) { + logger.debug("MLH _distribute 8 "); + } initProcessor(processor, msg); if (region.cache.isClosed() && !canBeSentDuringShutdown()) { @@ -567,7 +589,9 @@ public abstract class DistributedCacheOperation { "The cache has been closed", null); } - + if (isDebugEnabled) { + logger.debug("MLH _distribute 9 "); + } msg.setRecipients(recipients); failures = mgr.putOutgoing(msg); @@ -578,7 +602,7 @@ public abstract class DistributedCacheOperation { msg.setRecipients(needsOldValueInCacheOp); Set newFailures = mgr.putOutgoing(msg); if (newFailures != null) { - if (logger.isDebugEnabled()) { + if (isDebugEnabled) { logger.debug("Failed sending ({}) to {}", msg, newFailures); } if (failures != null && failures.size() > 0) { @@ -588,7 +612,9 @@ public abstract class DistributedCacheOperation { } } } - + if (isDebugEnabled) { + logger.debug("MLH _distribute 10 "); + } if (cachelessNodes.size() > 0) { cachelessNodes.removeAll(cachelessNodesWithNoCacheServer); if (cachelessNodes.size() > 0) { @@ -622,8 +648,10 @@ public abstract class DistributedCacheOperation { cachelessNodes.addAll(cachelessNodesWithNoCacheServer); } } - - if (failures != null && !failures.isEmpty() && logger.isDebugEnabled()) { + if (isDebugEnabled) { + logger.debug("MLH _distribute 11 "); + } + if (failures != null && !failures.isEmpty() && isDebugEnabled) { logger.debug("Failed sending ({}) to {} while processing event:{}", msg, failures, event); } @@ -666,7 +694,9 @@ public abstract class DistributedCacheOperation { } } } - + if (isDebugEnabled) { + logger.debug("MLH _distribute 12 "); + } // compute local client routing before waiting for an ack only for a bucket if (region.isUsedForPartitionedRegionBucket()) { FilterInfo filterInfo = getLocalFilterRouting(filterRouting); @@ -674,7 +704,9 @@ public abstract class DistributedCacheOperation { } waitForAckIfNeeded(msg, persistentIds); - + if (isDebugEnabled) { + logger.debug("MLH _distribute 13 "); + } if (/* msg != null && */reliableOp) { Set successfulRecips = new HashSet(recipients); successfulRecips.addAll(cachelessNodes); @@ -688,13 +720,15 @@ public abstract class DistributedCacheOperation { region.handleReliableDistribution(successfulRecips); } } - + if (isDebugEnabled) { + logger.debug("MLH _distribute 14 "); + } if (region.isUsedForPartitionedRegionBucket() && filterRouting != null) { removeDestroyTokensFromCqResultKeys(filterRouting); } } catch (CancelException e) { - if (logger.isDebugEnabled()) { + if (isDebugEnabled) { logger.debug("distribution of message aborted by shutdown: {}", this); } throw e; @@ -703,6 +737,9 @@ public abstract class DistributedCacheOperation { e); throw e; } finally { + if (isDebugEnabled) { + logger.debug("MLH _distribute 15 "); + } ReplyProcessor21.setShortSevereAlertProcessing(false); } } @@ -1605,19 +1642,30 @@ public abstract class DistributedCacheOperation { @Override protected void process(final DistributionMessage dmsg, boolean warn) { + logger.info("MLH CacheOperationReplyProcessor.process 1 dmsg " + dmsg + " msg = " + msg); if (dmsg instanceof ReplyMessage) { + logger.info("MLH CacheOperationReplyProcessor.process 2"); + ReplyMessage replyMessage = (ReplyMessage) dmsg; if (msg != null) { + logger.info("MLH CacheOperationReplyProcessor.process 3 replyMessage " + replyMessage); + boolean discard = !msg.processReply(replyMessage, this); + logger.info("MLH CacheOperationReplyProcessor.process 4 discard = " + discard); if (discard) { + logger.info("MLH CacheOperationReplyProcessor.process 5 discarding replyMessage = " + replyMessage); return; } } + logger.info("MLH CacheOperationReplyProcessor.process 5 "); + if (replyMessage.getClosed()) { + logger.info("MLH CacheOperationReplyProcessor.process 6 "); + closedMembers.add(replyMessage.getSender()); } } - + logger.info("MLH CacheOperationReplyProcessor.process 7 invoking super " + super.getClass().getName()); super.process(dmsg, warn); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index 1ec0ed2..ca27cdf 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -2166,7 +2166,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has if (!keepDS && systemFailureCause == null && (isReconnecting() || system.getReconnectedSystem() != null)) { - logger.debug( + logger.info( "Cache is shutting down distributed system connection. isReconnecting={} reconnectedSystem={} keepAlive={} keepDS={}", isReconnecting(), system.getReconnectedSystem(), keepAlive, keepDS); @@ -2206,28 +2206,34 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has this.keepAlive = keepAlive; isClosing = true; - logger.info("{}: Now closing.", this); + logger.info("MLH {}: Now closing.", this); // we don't clear the prID map if there is a system failure. Other // threads may be hung trying to communicate with the map locked if (systemFailureCause == null) { + logger.info("Clearing the PartitionedRegion.clearPRIdMap"); PartitionedRegion.clearPRIdMap(); } TXStateProxy tx = null; try { if (transactionManager != null) { + logger.info("Pausing transactions in transactionManager"); tx = transactionManager.pauseTransaction(); } // do this before closing regions + logger.info("MLH Stopping the resourceManager"); + resourceManager.close(); try { + logger.info("MLH Stopping the resourceAdvisor advisor"); resourceAdvisor.close(); } catch (CancelException ignore) { } try { + logger.info("MLH Stopping the jmxAdvisor advisor"); jmxAdvisor.close(); } catch (CancelException ignore) { } @@ -2237,21 +2243,17 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has sender.stop(); GatewaySenderAdvisor advisor = ((AbstractGatewaySender) sender).getSenderAdvisor(); if (advisor != null) { - if (isDebugEnabled) { - logger.debug("Stopping the GatewaySender advisor"); - } + logger.info("Stopping the GatewaySender advisor"); advisor.close(); } } catch (CancelException ignore) { } } - + logger.info("MLH destroyGatewaySenderLockService"); destroyGatewaySenderLockService(); if (eventThreadPool != null) { - if (isDebugEnabled) { - logger.debug("{}: stopping event thread pool...", this); - } + logger.info("{}: stopping event thread pool...", this); eventThreadPool.shutdown(); } @@ -2265,9 +2267,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has stopServices(); // no need to track PR instances - if (isDebugEnabled) { - logger.debug("{}: clearing partitioned regions...", this); - } + logger.info("{}: clearing partitioned regions...", this); synchronized (partitionedRegions) { int prSize = -partitionedRegions.size(); partitionedRegions.clear(); @@ -2288,9 +2288,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has InternalRegion prRoot = null; for (InternalRegion lr : rootRegions.values()) { - if (isDebugEnabled) { - logger.debug("{}: processing region {}", this, lr.getFullPath()); - } + logger.info("{}: processing region {}", this, lr.getFullPath()); if (PartitionedRegionHelper.PR_ROOT_REGION_NAME.equals(lr.getName())) { prRoot = lr; } else { @@ -2298,9 +2296,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has // this region will be closed internally by parent region continue; } - if (isDebugEnabled) { - logger.debug("{}: closing region {}...", this, lr.getFullPath()); - } + logger.info("{}: closing region {}...", this, lr.getFullPath()); try { lr.handleCacheClose(op); } catch (RuntimeException e) { @@ -2314,9 +2310,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has } try { - if (isDebugEnabled) { - logger.debug("{}: finishing partitioned region close...", this); - } + logger.info("{}: finishing partitioned region close...", this); PartitionedRegion.afterRegionsClosedByCacheClose(this); if (prRoot != null) { // do the PR meta root region last @@ -2334,9 +2328,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has // Close the CqService Handle. try { - if (isDebugEnabled) { - logger.debug("{}: closing CQ service...", this); - } + logger.info("{}: closing CQ service...", this); cqService.close(); } catch (RuntimeException ignore) { logger.info("Failed to get the CqService, to close during cache close (1)."); @@ -2344,20 +2336,16 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has PoolManager.close(keepAlive); - if (isDebugEnabled) { - logger.debug("{}: notifying admins of close...", this); - } + logger.info("{}: notifying admins of close...", this); try { SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CLOSE); } catch (CancelException ignore) { if (logger.isDebugEnabled()) { - logger.debug("Ignored cancellation while notifying admins"); + logger.info("Ignored cancellation while notifying admins"); } } - if (isDebugEnabled) { - logger.debug("{}: stopping destroyed entries processor...", this); - } + logger.info("{}: stopping destroyed entries processor...", this); tombstoneService.stop(); // NOTICE: the CloseCache message is the *last* message you can send! @@ -2370,9 +2358,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has if (distributionManager != null) { // Send CacheClosedMessage (and NOTHING ELSE) here - if (isDebugEnabled) { - logger.debug("{}: sending CloseCache to peers...", this); - } + logger.info("{}: sending CloseCache to peers...", this); Set<InternalDistributedMember> otherMembers = distributionManager.getOtherDistributionManagerIds(); ReplyProcessor21 processor = replyProcessor21Factory.create(system, otherMembers); @@ -2658,22 +2644,16 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has private void stopServers() { boolean isDebugEnabled = logger.isDebugEnabled(); - if (isDebugEnabled) { - logger.debug("{}: stopping cache servers...", this); - } + logger.info("{}: stopping cache servers...", this); boolean stoppedCacheServer = false; for (InternalCacheServer cacheServer : allCacheServers) { - if (isDebugEnabled) { - logger.debug("stopping bridge {}", cacheServer); - } + logger.info("stopping bridge {}", cacheServer); try { cacheServer.stop(); } catch (CancelException e) { - if (isDebugEnabled) { - logger.debug("Ignored cache closure while closing bridge {}", cacheServer, e); - } + logger.info("Ignored cache closure while closing bridge {}", cacheServer, e); } allCacheServers.remove(cacheServer); stoppedCacheServer = true; @@ -2681,16 +2661,12 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has InternalCacheServer receiverServer = gatewayReceiverServer.getAndSet(null); if (receiverServer != null) { - if (isDebugEnabled) { - logger.debug("stopping gateway receiver server {}", receiverServer); - } + logger.info("stopping gateway receiver server {}", receiverServer); try { receiverServer.stop(); } catch (CancelException e) { - if (isDebugEnabled) { - logger.debug("Ignored cache closure while closing gateway receiver server {}", + logger.info("Ignored cache closure while closing gateway receiver server {}", receiverServer, e); - } } stoppedCacheServer = true; } @@ -2702,26 +2678,18 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has } // stop HA services if they had been started - if (isDebugEnabled) { - logger.debug("{}: stopping HA services...", this); - } + logger.info("{}: stopping HA services...", this); try { HARegionQueue.stopHAServices(); } catch (CancelException e) { - if (isDebugEnabled) { - logger.debug("Ignored cache closure while closing HA services", e); - } + logger.info("Ignored cache closure while closing HA services", e); } - if (isDebugEnabled) { - logger.debug("{}: stopping client health monitor...", this); - } + logger.info("{}: stopping client health monitor...", this); try { ClientHealthMonitor.shutdownInstance(); } catch (CancelException e) { - if (isDebugEnabled) { - logger.debug("Ignored cache closure while closing client health monitor", e); - } + logger.info("Ignored cache closure while closing client health monitor", e); } // Reset the unique id counter for durable clients. If a durable client stops/starts its cache, diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 4ea2c6d..959d2f6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -613,9 +613,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di // close the GatewaySenderAdvisor GatewaySenderAdvisor advisor = this.getSenderAdvisor(); if (advisor != null) { - if (logger.isDebugEnabled()) { - logger.debug("Stopping the GatewaySender advisor"); - } + logger.debug("Stopping the GatewaySender advisor"); advisor.close(); } @@ -1036,14 +1034,22 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di seca.getRecipientDSIds().addAll(allRemoteDSIds); } } else { + GatewaySenderEventCallbackArgument geCallbackArg = new GatewaySenderEventCallbackArgument(callbackArg, this.getMyDSId(), allRemoteDSIds); clonedEvent.setCallbackArgument(geCallbackArg); + + if (isDebugEnabled) { + logger.debug("MLH distribute: 1 creating object GatewaySenderEventCallbackArgument:" + clonedEvent); + } } // If this gateway is not running, return if (!isRunning()) { if (this.isPrimary()) { + if (isDebugEnabled) { + logger.debug("MLH distribute: 2 recordDroppedEvent:" + clonedEvent); + } recordDroppedEvent(clonedEvent); } if (isDebugEnabled) { @@ -1079,6 +1085,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di logger.debug("Returning back without putting into the gateway sender queue:" + event); } if (this.isPrimary()) { + if (isDebugEnabled) { + logger.debug("MLH distribute: 3 recordDroppedEvent:" + clonedEvent); + } recordDroppedEvent(clonedEvent); } return; @@ -1099,7 +1108,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di // Get substitution value to enqueue if necessary Object substituteValue = getSubstituteValue(clonedEvent, operation); - + if (isDebugEnabled) { + logger.debug("MLH distribute: 4 enqueueEvent:" + clonedEvent); + } ev.enqueueEvent(operation, clonedEvent, substituteValue, isLastEventInTransaction); } catch (CancelException e) { logger.debug("caught cancel exception", e); @@ -1128,11 +1139,11 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di private void recordDroppedEvent(EntryEventImpl event) { if (this.eventProcessor != null) { this.eventProcessor.registerEventDroppedInPrimaryQueue(event); + logger.debug("registerEventDroppedInPrimaryQueue event: {}", event); + } else { tmpDroppedEvents.add(event); - if (logger.isDebugEnabled()) { - logger.debug("added to tmpDroppedEvents event: {}", event); - } + logger.debug("added to tmpDroppedEvents event: {}", event); } } @@ -1169,10 +1180,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di synchronized (this.queuedEventsSync) { while ((nextEvent = tmpQueuedEvents.poll()) != null) { try { - if (logger.isDebugEnabled()) { - logger.debug("Event :{} is enqueued to GatewaySenderQueue from TempQueue", - nextEvent); - } + logger.debug("Event :{} is enqueued to GatewaySenderQueue from TempQueue", + nextEvent); stats.decTempQueueSize(); this.eventProcessor.enqueueEvent(nextEvent.getOperation(), nextEvent.getEvent(), nextEvent.getSubstituteValue()); @@ -1204,11 +1213,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di while (itr.hasNext()) { TmpQueueEvent event = itr.next(); if (tailKey.equals(event.getEvent().getTailKey())) { - if (logger.isDebugEnabled()) { - logger.debug( - "shadowKey {} is found in tmpQueueEvents at AbstractGatewaySender level. Removing from there..", - tailKey); - } + logger.debug( + "shadowKey {} is found in tmpQueueEvents at AbstractGatewaySender level. Removing from there..", + tailKey); event.release(); itr.remove(); return true; @@ -1304,9 +1311,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di // Store the index locally this.eventIdIndex = index; - if (logger.isDebugEnabled()) { - logger.debug("{}: {} event id index: {}", this, messagePrefix, this.eventIdIndex); - } + logger.debug("{}: {} event id index: {}", this, messagePrefix, this.eventIdIndex); } } finally { // Unlock the lock if necessary diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 294121a..a65e680 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -339,9 +339,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread this.isPaused = false; // Notify thread to resume - if (logger.isDebugEnabled()) { - logger.debug("{}: Resumed dispatching", this); - } + logger.debug("{}: Resumed dispatching", this); synchronized (this.pausedLock) { this.pausedLock.notifyAll(); } @@ -726,9 +724,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread } } } // unsuccessful batch - if (logger.isDebugEnabled()) { - logger.debug("Finished processing events (batch #{})", (getBatchId() - 1)); - } + logger.debug("Finished processing events (batch #{})", (getBatchId() - 1)); } finally { afterExecute(); } @@ -739,9 +735,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread this.resetLastPeekedEvents = true; // most possible case is ParallelWan when user PR is locally destroyed // shadow PR is also locally destroyed - if (logger.isDebugEnabled()) { - logger.debug("Observed RegionDestroyedException on Queue's region."); - } + logger.debug("Observed RegionDestroyedException on Queue's region."); } catch (CancelException e) { logger.debug("Caught cancel exception", e); setIsStopped(true); @@ -791,9 +785,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread List<GatewaySenderEventImpl> conflatedEvents = null; // Conflate the batch if necessary if (this.sender.isBatchConflationEnabled() && events.size() > 1) { - if (logger.isDebugEnabled()) { - logEvents("original", events); - } + logEvents("original", events); Map<ConflationKey, GatewaySenderEventImpl> conflatedEventsMap = new LinkedHashMap<ConflationKey, GatewaySenderEventImpl>(); conflatedEvents = new ArrayList<GatewaySenderEventImpl>(); @@ -832,9 +824,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread // Increment the events conflated from batches statistic this.sender.getStatistics() .incEventsConflatedFromBatches(events.size() - conflatedEvents.size()); - if (logger.isDebugEnabled()) { - logEvents("conflated", conflatedEvents); - } + logEvents("conflated", conflatedEvents); } else { conflatedEvents = events; } @@ -1043,9 +1033,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread } } List<GatewaySenderEventImpl> events = eventsArr[0]; - if (logger.isDebugEnabled()) { - logger.debug("Removing events from the queue {}", events.size()); - } + logger.debug("Removing events from the queue {}", events.size()); eventQueueRemove(events.size()); logThresholdExceededAlerts(events); @@ -1097,9 +1085,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread if (!this.isPaused) { return; } - if (logger.isDebugEnabled()) { - logger.debug("GatewaySenderEventProcessor is paused. Waiting for Resumption"); - } + logger.debug("GatewaySenderEventProcessor is paused. Waiting for Resumption"); this.isDispatcherWaiting = true; this.pausedLock.notifyAll(); while (this.isPaused) { @@ -1191,9 +1177,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread } resumeDispatching(); - if (logger.isDebugEnabled()) { - logger.debug("{}: Notifying the dispatcher to terminate", this); - } + logger.debug("{}: Notifying the dispatcher to terminate", this); // If this is the primary, stay alive for a predefined time // OR until the queue becomes empty @@ -1202,9 +1186,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread try { while (!(this.queue.size() == 0)) { Thread.sleep(5000); - if (logger.isDebugEnabled()) { - logger.debug("{}: Waiting for the queue to get empty.", this); - } + logger.debug("{}: Waiting for the queue to get empty.", this); } } catch (InterruptedException e) { // interrupted @@ -1219,16 +1201,16 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread } } } else { + logger.debug("{}: notifyPrimaryLock.", this); this.sender.getSenderAdvisor().notifyPrimaryLock(); } + logger.debug("{}: setIsStopped to true.", this); setIsStopped(true); dispatcher.stop(); if (this.isAlive()) { - if (logger.isDebugEnabled()) { - logger.debug("{}: Joining with the dispatcher thread upto limit of 5 seconds", this); - } + logger.debug("{}: Joining with the dispatcher thread upto limit of 5 seconds", this); try { this.join(5000); // wait for our thread to stop if (this.isAlive()) { @@ -1250,15 +1232,11 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread closeProcessor(); - if (logger.isDebugEnabled()) { - logger.debug("Stopped dispatching: {}", this); - } + logger.debug("Stopped dispatching: {}", this); } public void closeProcessor() { - if (logger.isDebugEnabled()) { - logger.debug("Closing dispatcher"); - } + logger.debug("Closing dispatcher"); try { if (this.sender.isPrimary() && this.queue.size() > 0) { logger.warn("Destroying GatewayEventDispatcher with actively queued data."); @@ -1273,16 +1251,12 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread // just checking in case we should log a warning } finally { this.queue.close(); - if (logger.isDebugEnabled()) { - logger.debug("Closed dispatcher"); - } + logger.debug("Closed dispatcher"); } } protected void destroyProcessor() { - if (logger.isDebugEnabled()) { - logger.debug("Destroying dispatcher"); - } + logger.debug("Destroying dispatcher"); try { try { if (this.queue.peek() != null) { @@ -1299,9 +1273,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread // just checking in case we should log a warning } finally { this.queue.getRegion().localDestroyRegion(); - if (logger.isDebugEnabled()) { - logger.debug("Destroyed dispatcher"); - } + logger.debug("Destroyed dispatcher"); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java index 6af0866..088998f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java @@ -27,7 +27,9 @@ import java.util.TreeSet; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; +import org.apache.geode.ComponentStatus; import org.apache.geode.DataSerializer; +import org.apache.geode.StatusReporter; import org.apache.geode.annotations.Immutable; import org.apache.geode.cache.asyncqueue.AsyncEventListener; import org.apache.geode.cache.util.Gateway; @@ -52,7 +54,7 @@ import org.apache.geode.internal.serialization.StaticSerialization; import org.apache.geode.logging.internal.executors.LoggingThread; import org.apache.geode.logging.internal.log4j.api.LogService; -public class GatewaySenderAdvisor extends DistributionAdvisor { +public class GatewaySenderAdvisor extends DistributionAdvisor implements ComponentStatus { private static final Logger logger = LogService.getLogger(); private DistributedLockService lockService; @@ -75,6 +77,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { public static GatewaySenderAdvisor createGatewaySenderAdvisor(DistributionAdvisee sender) { GatewaySenderAdvisor advisor = new GatewaySenderAdvisor(sender); + StatusReporter.getStatus(); + StatusReporter.registerComponent(advisor); advisor.initialize(); return advisor; } @@ -252,6 +256,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { @Override public void profileUpdated(Profile profile) { if (profile instanceof GatewaySenderProfile) { + logger.info("MLH profileUpdated " + profile.getDistributedMember().getName()); + GatewaySenderProfile sp = (GatewaySenderProfile) profile; if (!sp.isParallel) { // SerialGatewaySender if (!sp.isRunning) { @@ -262,11 +268,9 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { if (!this.sender.isPrimary()) { if (!adviseEldestGatewaySender()) {// AND this is not the eldest // sender - if (logger.isDebugEnabled()) { - logger.debug( - "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", - this.sender); - } + logger.debug( + "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", + this.sender); return; } launchLockObtainingVolunteerThread(); @@ -287,6 +291,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { @Override protected void profileRemoved(Profile profile) { if (profile instanceof GatewaySenderProfile) { + logger.info("MLH profileRemoved " + profile.getDistributedMember().getName()); + GatewaySenderProfile sp = (GatewaySenderProfile) profile; if (!sp.isParallel) {// SerialGatewaySender // if there is a primary sender, then don't volunteer for primary @@ -295,11 +301,9 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } if (!this.sender.isPrimary()) {// IF this sender is not primary if (!adviseEldestGatewaySender()) {// AND this is not the eldest sender - if (logger.isDebugEnabled()) { - logger.debug( - "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", - this.sender); - } + logger.debug( + "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", + this.sender); return; } launchLockObtainingVolunteerThread(); @@ -320,29 +324,21 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { this.lockService = DLockService.create(dlsName, ds, true, true, true); } Assert.assertTrue(this.lockService != null); - if (logger.isDebugEnabled()) { - logger.debug("{}: Obtained DistributedLockService: {}", this, this.lockService); - } + logger.debug("{}: Obtained DistributedLockService: {}", this, this.lockService); } public boolean volunteerForPrimary() { - if (logger.isDebugEnabled()) { - logger.debug("Sender : {} is volunteering for Primary ", this.sender.getId()); - } + logger.debug("Sender : {} is volunteering for Primary ", this.sender.getId()); if (advisePrimaryGatewaySender() == null) { if (!adviseEldestGatewaySender()) { - if (logger.isDebugEnabled()) { - logger.debug( - "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", - this.sender); - } - return false; - } - if (logger.isDebugEnabled()) { - logger.debug("Sender : {} no Primary available. So going to acquire distributed lock", + logger.debug( + "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", this.sender); + return false; } + logger.debug("Sender : {} no Primary available. So going to acquire distributed lock", + this.sender); this.lockService.lock(this.lockToken, 10000, -1); return this.lockService.isHeldByCurrentThread(this.lockToken); } @@ -404,13 +400,13 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } public void makePrimary() { - logger.info("{} : Starting as primary", this.sender); + logger.info("{} : Starting as primary", this.sender.id); AbstractGatewaySenderEventProcessor eventProcessor = this.sender.getEventProcessor(); if (eventProcessor != null) { eventProcessor.removeCacheListener(); } - logger.info("{} : Becoming primary gateway sender", this.sender); + logger.info("{} : Becoming primary gateway sender", this.sender.id); notifyAndBecomePrimary(); new UpdateAttributesProcessor(this.sender).distribute(false); } @@ -429,10 +425,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } public void makeSecondary() { - if (logger.isDebugEnabled()) { - logger.debug("{}: Did not obtain the lock on {}. Starting as secondary gateway sender.", - this.sender, this.lockToken); - } + logger.debug("{}: Did not obtain the lock on {}. Starting as secondary gateway sender.", + this.sender, this.lockToken); // Set primary flag to false logger.info( @@ -451,15 +445,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { if (!(GatewaySenderAdvisor.this.sender.isRunning())) { return; } - if (logger.isDebugEnabled()) { - logger.debug("{}: Obtaining the lock on {}", this, GatewaySenderAdvisor.this.lockToken); - } + logger.debug("{}: Obtaining the lock on {}", this, GatewaySenderAdvisor.this.lockToken); if (volunteerForPrimary()) { - if (logger.isDebugEnabled()) { - logger.debug("{}: Obtained the lock on {}", this, - GatewaySenderAdvisor.this.lockToken); - } + logger.debug("{}: Obtained the lock on {}", this, + GatewaySenderAdvisor.this.lockToken); logger.info("{} is becoming primary gateway Sender.", GatewaySenderAdvisor.this); @@ -484,6 +474,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { public void waitToBecomePrimary(AbstractGatewaySenderEventProcessor callingProcessor) throws InterruptedException { + logger.info("MLH {} : waitToBecomePrimary", this.sender.getId()); + if (isPrimary()) { return; } @@ -499,6 +491,24 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } } + @Override + public String name() { + return "Gateway Sender Advisor"; + } + + @Override + public String getStatusString() { + return "Running"; + } + + @Override + public void print() { + logger.info(" Server Name: " + name() + " Component Status: " + getStatusString() + + " isPrimary: " + isPrimary() + " Advisee: " + getAdvisee() + " Sender: " + sender + + " Profiles: " + profiles); + + } + /** * Profile information for a remote counterpart. */ @@ -758,6 +768,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { @Override public void close() { + logger.info("MLH close Removing profile " + this.sender.locName); new UpdateAttributesProcessor(this.getAdvisee(), true).distribute(false); super.close(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java index e5683e0..589a7ff 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java @@ -70,16 +70,12 @@ public class GatewaySenderEventCallbackDispatcher implements GatewaySenderEventD GatewaySenderStats statistics = this.eventProcessor.sender.getStatistics(); boolean success = false; try { - if (logger.isDebugEnabled()) { - logger.debug("About to dispatch batch"); - } + logger.debug("About to dispatch batch"); long start = statistics.startTime(); // Send the batch to the corresponding GatewaySender success = dispatchBatch(events); statistics.endBatch(start, events.size()); - if (logger.isDebugEnabled()) { - logger.debug("Done dispatching the batch"); - } + logger.debug("Done dispatching the batch"); } catch (GatewaySenderException e) { // Do nothing in this case. The exception has already been logged. } catch (CancelException e) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java index 00a1196..287cdc7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java @@ -68,12 +68,10 @@ public class GatewaySenderQueueEntrySynchronizationOperation { } protected void synchronizeEntries() { - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Requesting synchronization from member={}; regionPath={}; entriesToSynchronize={}", - getClass().getSimpleName(), this.recipient, this.region.getFullPath(), - this.entriesToSynchronize); - } + logger.debug( + "{}: Requesting synchronization from member={}; regionPath={}; entriesToSynchronize={}", + getClass().getSimpleName(), this.recipient, this.region.getFullPath(), + this.entriesToSynchronize); // Create and send message DistributionManager dm = this.region.getDistributionManager(); GatewaySenderQueueEntrySynchronizationReplyProcessor processor = @@ -125,13 +123,11 @@ public class GatewaySenderQueueEntrySynchronizationOperation { if (msg instanceof ReplyMessage) { ReplyMessage reply = (ReplyMessage) msg; if (reply.getException() == null) { - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Processing reply from member={}; regionPath={}; key={}; entriesToSynchronize={}", - getClass().getSimpleName(), reply.getSender(), - this.operation.region.getFullPath(), this.operation.entriesToSynchronize, - reply.getReturnValue()); - } + logger.debug( + "{}: Processing reply from member={}; regionPath={}; key={}; entriesToSynchronize={}", + getClass().getSimpleName(), reply.getSender(), + this.operation.region.getFullPath(), this.operation.entriesToSynchronize, + reply.getReturnValue()); List<Map<String, GatewayQueueEvent>> events = (List<Map<String, GatewayQueueEvent>>) reply.getReturnValue(); for (int i = 0; i < events.size(); i++) { @@ -194,10 +190,8 @@ public class GatewaySenderQueueEntrySynchronizationOperation { Object result = null; ReplyException replyException = null; try { - if (logger.isDebugEnabled()) { - logger.debug("{}: Providing synchronization region={}; entriesToSynchronize={}", - getClass().getSimpleName(), this.regionPath, this.entriesToSynchronize); - } + logger.debug("{}: Providing synchronization region={}; entriesToSynchronize={}", + getClass().getSimpleName(), this.regionPath, this.entriesToSynchronize); result = getSynchronizationEvents(dm.getCache()); } catch (Throwable t) { replyException = new ReplyException(t); @@ -210,10 +204,8 @@ public class GatewaySenderQueueEntrySynchronizationOperation { } else { replyMsg.setException(replyException); } - if (logger.isDebugEnabled()) { - logger.debug("{}: Sending synchronization reply returnValue={}; exception={}", - getClass().getSimpleName(), replyMsg.getReturnValue(), replyMsg.getException()); - } + logger.debug("{}: Sending synchronization reply returnValue={}; exception={}", + getClass().getSimpleName(), replyMsg.getReturnValue(), replyMsg.getException()); dm.putOutgoing(replyMsg); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index 0ef1fd3..1b18258 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -98,9 +98,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor targetRs.add(pr); } } - if (logger.isDebugEnabled()) { - logger.debug("The target PRs are {} Dispatchers: {}", targetRs, nDispatcher); - } + logger.debug("The target PRs are {} Dispatchers: {}", targetRs, nDispatcher); createProcessors(sender.getDispatcherThreads(), targetRs, cleanQueues); @@ -111,9 +109,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor protected void createProcessors(int dispatcherThreads, Set<Region> targetRs, boolean cleanQueues) { processors = new ParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()]; - if (logger.isDebugEnabled()) { - logger.debug("Creating AsyncEventProcessor"); - } + logger.debug("Creating AsyncEventProcessor"); for (int i = 0; i < sender.getDispatcherThreads(); i++) { processors[i] = new ParallelGatewaySenderEventProcessor(sender, targetRs, i, sender.getDispatcherThreads(), getThreadMonitorObj(), cleanQueues); @@ -158,10 +154,8 @@ public class ConcurrentParallelGatewaySenderEventProcessor ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue; PartitionedRegion prQ = cpgsq.getRegion(droppedEvent.getRegion().getFullPath()); if (prQ == null) { - if (logger.isDebugEnabled()) { - logger.debug("shadow partitioned region " + droppedEvent.getRegion().getFullPath() - + " is not created yet."); - } + logger.debug("shadow partitioned region " + droppedEvent.getRegion().getFullPath() + + " is not created yet."); return; } int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) droppedEvent); @@ -172,10 +166,8 @@ public class ConcurrentParallelGatewaySenderEventProcessor if (isPrimary) { pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey); this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning(); - if (logger.isDebugEnabled()) { - logger.debug("register dropped event for primary queue. BucketId is " + bucketId - + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath()); - } + logger.debug("register dropped event for primary queue. BucketId is " + bucketId + + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath()); } } @@ -259,11 +251,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor for (Future<Boolean> f : futures) { try { Boolean b = f.get(); - if (logger.isDebugEnabled()) { - logger.debug( - "ConcurrentParallelGatewaySenderEventProcessor: {} stopped dispatching: {}", - (b ? "Successfully" : "Unsuccessfully"), this); - } + logger.debug( + "ConcurrentParallelGatewaySenderEventProcessor: {} stopped dispatching: {}", + (b ? "Successfully" : "Unsuccessfully"), this); } catch (ExecutionException e) { // we don't expect any exception but if caught then eat it and log warning logger.warn(String.format("GatewaySender %s caught exception while stopping: %s", sender, @@ -276,9 +266,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor stopperService.shutdown(); closeProcessor(); - if (logger.isDebugEnabled()) { - logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Stopped dispatching: {}", this); - } + logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Stopped dispatching: {}", this); } @Override @@ -294,9 +282,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor parallelProcessor.pauseDispatching(); } super.pauseDispatching(); - if (logger.isDebugEnabled()) { - logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Paused dispatching: {}", this); - } + logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Paused dispatching: {}", this); } @Override @@ -312,9 +298,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor parallelProcessor.resumeDispatching(); } super.resumeDispatching(); - if (logger.isDebugEnabled()) { - logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Resumed dispatching: {}", this); - } + logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Resumed dispatching: {}", this); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java index 65567f0..8c5efa0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java @@ -74,9 +74,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv targetRs.add(region); } } - if (logger.isDebugEnabled()) { - logger.debug("The target Regions are(PGSEP) {}", targetRs); - } + logger.debug("The target Regions are(PGSEP) {}", targetRs); ParallelGatewaySenderQueue queue; queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher, @@ -108,11 +106,9 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv // If it is the case then the event must be coming from notificationOnly message. // Don't enqueue the event and return from here only. // Fix for #49081 and EntryDestroyedException in #49367. - if (logger.isDebugEnabled()) { - logger.debug( - "ParallelGatewaySenderEventProcessor not enqueing the following event since tailKey is not set. {}", - event); - } + logger.debug( + "ParallelGatewaySenderEventProcessor not enqueing the following event since tailKey is not set. {}", + event); return; } @@ -141,9 +137,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv } getSender().getStatistics().endPut(start); } else { - if (logger.isDebugEnabled()) { - logger.debug("The Event {} is filtered.", gatewayQueueEvent); - } + logger.debug("The Event {} is filtered.", gatewayQueueEvent); getSender().getStatistics().incEventsFiltered(); } } finally { @@ -207,9 +201,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv @Override public void initializeEventDispatcher() { - if (logger.isDebugEnabled()) { - logger.debug(" Creating the GatewayEventCallbackDispatcher"); - } + logger.debug(" Creating the GatewayEventCallbackDispatcher"); this.dispatcher = new GatewaySenderEventCallbackDispatcher(this); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 797494f..d07620f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -211,16 +211,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { try { destroyEventFromQueue(prQ, bucketId, previousTailKeyTobeRemoved); } catch (EntryNotFoundException e) { - if (logger.isDebugEnabled()) { - logger.debug("{}: Not conflating {} due to EntryNotFoundException", this, - conflatableObject.getKeyToConflate()); - } - } - if (logger.isDebugEnabled()) { - logger.debug("{}: Conflated {} for key={} in queue for region={}", this, - conflatableObject.getValueToConflate(), conflatableObject.getKeyToConflate(), - prQ.getName()); + logger.debug("{}: Not conflating {} due to EntryNotFoundException", this, + conflatableObject.getKeyToConflate()); } + logger.debug("{}: Conflated {} for key={} in queue for region={}", this, + conflatableObject.getValueToConflate(), conflatableObject.getKeyToConflate(), + prQ.getName()); } private Object deserialize(Object serializedBytes) { @@ -325,11 +321,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { this.sender.getLifeCycleLock().writeLock().lock(); PartitionedRegion prQ = null; - if (logger.isDebugEnabled()) { - logger.debug( - "addShadowPartitionedRegionForUserRR: Going to create shadowpr for userRegion {}", - userRegion.getFullPath()); - } + logger.debug( + "addShadowPartitionedRegionForUserRR: Going to create shadowpr for userRegion {}", + userRegion.getFullPath()); try { String regionName = userRegion.getFullPath(); @@ -377,9 +371,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { final RegionAttributes ra = fact.getCreateAttributes(); - if (logger.isDebugEnabled()) { - logger.debug("{}: Attempting to create queue region: {}", this, prQName); - } + logger.debug("{}: Attempting to create queue region: {}", this, prQName); ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender, @@ -389,10 +381,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { .setSnapshotInputStream(null).setImageTarget(null); prQ = (PartitionedRegion) fact.create(prQName); - if (logger.isDebugEnabled()) { - logger.debug("Region created : {} partition Attributes : {}", prQ, - prQ.getPartitionAttributes()); - } + logger.debug("Region created : {} partition Attributes : {}", prQ, + prQ.getPartitionAttributes()); // TODO This should not be set on the PR but on the GatewaySender prQ.enableConflation(sender.isBatchConflationEnabled()); @@ -409,9 +399,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } // In case of Replicated Region it may not be necessary. - if (logger.isDebugEnabled()) { - logger.debug("{}: Created queue region: {}", this, prQ); - } + logger.debug("{}: Created queue region: {}", this, prQ); } else { // in case shadowPR exists already (can be possible when sender is // started from stop operation) @@ -445,11 +433,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public void addShadowPartitionedRegionForUserPR(PartitionedRegion userPR, PartitionedRegion childPR) { - if (logger.isDebugEnabled()) { - logger.debug( - "{} addShadowPartitionedRegionForUserPR: Attempting to create queue region: {}; child region: {}", - this, userPR.getDisplayName(), childPR == null ? "null" : childPR.getDisplayName()); - } + logger.debug( + "{} addShadowPartitionedRegionForUserPR: Attempting to create queue region: {}; child region: {}", + this, userPR.getDisplayName(), childPR == null ? "null" : childPR.getDisplayName()); this.sender.getLifeCycleLock().writeLock().lock(); PartitionedRegion prQ = null; @@ -534,9 +520,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { final RegionAttributes ra = fact.getCreateAttributes(); - if (logger.isDebugEnabled()) { - logger.debug("{}: Attempting to create queue region: {}", this, prQName); - } + logger.debug("{}: Attempting to create queue region: {}", this, prQName); ParallelGatewaySenderQueueMetaRegion meta = metaRegionFactory.newMetataRegion(cache, prQName, ra, sender); @@ -564,9 +548,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { prQ.shadowPRWaitForBucketRecovery(); } - if (logger.isDebugEnabled()) { - logger.debug("{}: Created queue region: {}", this, prQ); - } + logger.debug("{}: Created queue region: {}", this, prQ); } else { if (isAccessor) @@ -642,10 +624,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { */ private void handleShadowPRExistsScenario(Cache cache, PartitionedRegion prQ) { // Note: The region will not be null if the sender is started again after stop operation - if (logger.isDebugEnabled()) { - logger.debug("{}: No need to create the region as the region has been retrieved: {}", this, - prQ); - } + logger.debug("{}: No need to create the region as the region has been retrieved: {}", this, + prQ); } protected void afterRegionAdd(PartitionedRegion userPR) { @@ -875,17 +855,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue { if (isQueueEmpty) { queueEmptyLock.lock(); try { - if (logger.isDebugEnabled()) { - logger.debug("Going to notify, isQueueEmpty {}", isQueueEmpty); - } + logger.debug("Going to notify, isQueueEmpty {}", isQueueEmpty); if (isQueueEmpty) { isQueueEmpty = false; queueEmptyCondition.signal(); } } finally { - if (logger.isDebugEnabled()) { - logger.debug("Notified!, isQueueEmpty {}", isQueueEmpty); - } + logger.debug("Notified!, isQueueEmpty {}", isQueueEmpty); queueEmptyLock.unlock(); } } @@ -902,17 +878,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // need to find out from hcih revision this code came } } catch (BucketNotFoundException e) { - if (logger.isDebugEnabled()) { - logger.debug("For bucket {} the current bucket redundancy is {}", brq.getId(), - brq.getPartitionedRegion().getRegionAdvisor().getBucketAdvisor(brq.getId()) - .getBucketRedundancy()); - } + logger.debug("For bucket {} the current bucket redundancy is {}", brq.getId(), + brq.getPartitionedRegion().getRegionAdvisor().getBucketAdvisor(brq.getId()) + .getBucketRedundancy()); } catch (ForceReattemptException e) { - if (logger.isDebugEnabled()) { - logger.debug( - "getInitializedBucketForId: Got ForceReattemptException for {} for bucket = {}", this, - brq.getId()); - } + logger.debug( + "getInitializedBucketForId: Got ForceReattemptException for {} for bucket = {}", this, + brq.getId()); } finally { if (!addedValueToQueue) { value.release(); @@ -1006,10 +978,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } - if (logger.isDebugEnabled()) { - logger.debug("getRandomPrimaryBucket: total {} for this processor: {}", allBuckets.size(), - thisProcessorBuckets.size()); - } + logger.debug("getRandomPrimaryBucket: total {} for this processor: {}", allBuckets.size(), + thisProcessorBuckets.size()); int nTry = thisProcessorBuckets.size(); @@ -1103,19 +1073,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue { key, this, bucketId, this.sender); } } catch (ForceReattemptException e) { - if (logger.isDebugEnabled()) { - logger.debug("Bucket :{} moved to other member", bucketId); - } + logger.debug("Bucket :{} moved to other member", bucketId); } catch (PrimaryBucketException e) { - if (logger.isDebugEnabled()) { - logger.debug("Primary bucket :{} moved to other member", bucketId); - } + logger.debug("Primary bucket :{} moved to other member", bucketId); } catch (RegionDestroyedException e) { - if (logger.isDebugEnabled()) { - logger.debug( - "Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}", key, - bucketId, prQ.getFullPath()); - } + logger.debug( + "Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}", key, + bucketId, prQ.getFullPath()); } addRemovedEvent(prQ, bucketId, key); } @@ -1144,9 +1108,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } catch (BucketRegionQueueUnavailableException e) { return object;// since this is not set, it would be null } catch (ForceReattemptException e) { - if (logger.isDebugEnabled()) { - logger.debug("Remove: Got ForceReattemptException for {} for bucke = {}", this, bucketId); - } + logger.debug("Remove: Got ForceReattemptException for {} for bucke = {}", this, bucketId); } } return object; // OFFHEAP: ok since only callers uses it to check for empty queue @@ -1385,11 +1347,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { peekedEvents.add(event); areAllEventsForTransactionInBatch = event.isLastEventInTransaction(); - if (logger.isDebugEnabled()) { - logger.debug( - "Peeking extra event: {}, bucketId: {}, isLastEventInTransaction: {}, batch size: {}", - event.getKey(), bucketId, event.isLastEventInTransaction(), batch.size()); - } + logger.debug( + "Peeking extra event: {}, bucketId: {}, isLastEventInTransaction: {}, batch size: {}", + event.getKey(), bucketId, event.isLastEventInTransaction(), batch.size()); } } if (!areAllEventsForTransactionInBatch) { @@ -1481,14 +1441,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { this.peekedEventsProcessingInProgress = true; addPreviouslyPeekedEvents(batch, batchSize); } - if (logger.isDebugEnabled()) { - StringBuffer buffer = new StringBuffer(); - for (Object ge : batch) { - buffer.append("event :"); - buffer.append(ge); - } - logger.debug("Adding already peeked events to the batch {}", buffer); + StringBuffer buffer = new StringBuffer(); + for (Object ge : batch) { + buffer.append("event :"); + buffer.append(ge); } + logger.debug("Adding already peeked events to the batch {}", buffer); } } @@ -1518,9 +1476,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { queueEmptyLock.lock(); try { if (isQueueEmpty) { // merge44610: this if condition came from cheetah 44610 - if (logger.isDebugEnabled()) { - logger.debug("Going to wait, till notified."); - } + logger.debug("Going to wait, till notified."); // merge44610: this time waiting came from cheetah 44610. In cedar 1000 // is assumed as milliseconds. In cheetah TimeUnitParamter Millisecond // is used. In cheetah stoppable has method to consider timeunit @@ -1531,9 +1487,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // update the flag so that next time when we come we will block. isQueueEmpty = this.localSizeForProcessor() == 0; } finally { - if (logger.isDebugEnabled()) { - logger.debug("Going to unblock. isQueueEmpty {}", isQueueEmpty); - } + logger.debug("Going to unblock. isQueueEmpty {}", isQueueEmpty); queueEmptyLock.unlock(); } } @@ -1542,18 +1496,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue { Object object = null; BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); - if (logger.isDebugEnabled()) { - logger.debug("{}: Peekahead for the bucket {}", this, bucketId); - } + logger.debug("{}: Peekahead for the bucket {}", this, bucketId); try { object = brq.peek(); } catch (BucketRegionQueueUnavailableException e) { // BucketRegionQueue unavailable. Can be due to the BucketRegionQueue being destroyed. return object;// this will be null } - if (logger.isDebugEnabled()) { - logger.debug("{}: Peeked object from bucket {} object: {}", this, bucketId, object); - } + logger.debug("{}: Peeked object from bucket {} object: {}", this, bucketId, object); if (object == null) { if (this.stats != null) { @@ -1630,10 +1580,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets(); } } - if (logger.isDebugEnabled()) { - logger.debug("The name of the queue region is {} and the size is {}", prQ.getFullPath(), - size); - } + logger.debug("The name of the queue region is {} and the size is {}", prQ.getFullPath(), + size); } return size /* + sender.getTmpQueuedEventSize() */; } @@ -1650,10 +1598,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { size += br.size(); } } - if (logger.isDebugEnabled()) { - logger.debug("The name of the queue region is {} and the size is {}", prQ.getFullPath(), - size); - } + logger.debug("The name of the queue region is {} and the size is {}", prQ.getFullPath(), + size); } return size /* + sender.getTmpQueuedEventSize() */; } @@ -1662,10 +1608,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public int size() { int size = 0; for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) { - if (logger.isDebugEnabled()) { - logger.debug("The name of the queue region is {} and the size is {}. keyset size is {}", - prQ.getName(), prQ.size(), prQ.keys().size()); - } + logger.debug("The name of the queue region is {} and the size is {}. keyset size is {}", + prQ.getName(), prQ.size(), prQ.keys().size()); size += prQ.size(); } @@ -1703,18 +1647,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue { for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) { DiskRegionStats diskStats = prQ.getDiskRegionStats(); if (diskStats == null) { - if (logger.isDebugEnabled()) { - logger.debug( - "{}: DiskRegionStats for shadow PR is null. Returning the numEntriesOverflowOnDisk as 0", - this); - } - return 0; - } - if (logger.isDebugEnabled()) { logger.debug( - "{}: DiskRegionStats for shadow PR is NOT null. Returning the numEntriesOverflowOnDisk obtained from DiskRegionStats", + "{}: DiskRegionStats for shadow PR is null. Returning the numEntriesOverflowOnDisk as 0", this); + return 0; } + logger.debug( + "{}: DiskRegionStats for shadow PR is NOT null. Returning the numEntriesOverflowOnDisk obtained from DiskRegionStats", + this); numEntriesOnDisk += diskStats.getNumOverflowOnDisk(); } return numEntriesOnDisk; @@ -1725,17 +1665,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue { for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) { DiskRegionStats diskStats = prQ.getDiskRegionStats(); if (diskStats == null) { - if (logger.isDebugEnabled()) { - logger.debug( - "{}: DiskRegionStats for shadow PR is null. Returning the numEntriesInVM as 0", this); - } - return 0; - } - if (logger.isDebugEnabled()) { logger.debug( - "{}: DiskRegionStats for shadow PR is NOT null. Returning the numEntriesInVM obtained from DiskRegionStats", - this); + "{}: DiskRegionStats for shadow PR is null. Returning the numEntriesInVM as 0", this); + return 0; } + logger.debug( + "{}: DiskRegionStats for shadow PR is NOT null. Returning the numEntriesInVM obtained from DiskRegionStats", + this); numEntriesInVM += diskStats.getNumEntriesInVM(); } return numEntriesInVM; @@ -1858,14 +1794,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { Thread.currentThread().interrupt(); } - if (logger.isDebugEnabled()) { - buckToDispatchLock.lock(); - try { - logger.debug("BatchRemovalThread about to query the batch removal map {}", - regionToDispatchedKeysMap); - } finally { - buckToDispatchLock.unlock(); - } + buckToDispatchLock.lock(); + try { + logger.debug("BatchRemovalThread about to query the batch removal map {}", + regionToDispatchedKeysMap); + } finally { + buckToDispatchLock.unlock(); } final HashMap<String, Map<Integer, List>> temp; @@ -1895,9 +1829,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } // be somewhat tolerant of failures catch (CancelException e) { - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread is exiting due to cancellation"); - } + logger.debug("BatchRemovalThread is exiting due to cancellation"); break; } catch (VirtualMachineError err) { SystemFailure.initiateFailure(err); @@ -1921,16 +1853,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { if (checkCancelled()) { break; } - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread: ignoring exception", t); - } + logger.debug("BatchRemovalThread: ignoring exception", t); } } // for } // ensure exit message is printed catch (CancelException e) { - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread exiting due to cancellation: " + e); - } + logger.debug("BatchRemovalThread exiting due to cancellation: " + e); } finally { logger.info("The QueueRemovalThread is done."); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java index c2a3dbd..0304660c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java @@ -75,11 +75,9 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator // create and submit callable with updated timeout Callable<Boolean> callable = createWaitUntilBucketRegionQueueFlushedCallable( (BucketRegionQueue) br, nanosRemaining, TimeUnit.NANOSECONDS); - if (logger.isDebugEnabled()) { - logger.debug( - "WaitUntilParallelGatewaySenderFlushedCoordinator: Submitting callable for bucket " - + br.getId() + " callable=" + callable + " nanosRemaining=" + nanosRemaining); - } + logger.debug( + "WaitUntilParallelGatewaySenderFlushedCoordinator: Submitting callable for bucket " + + br.getId() + " callable=" + callable + " nanosRemaining=" + nanosRemaining); callableFutures.add(service.submit(callable)); callableCount++; if ((callableCount % CALLABLES_CHUNK_SIZE) == 0 @@ -88,10 +86,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator new CallablesChunkResults(localResult, exceptionToThrow, callableFutures).invoke(); localResult = callablesChunkResults.getLocalResult(); exceptionToThrow = callablesChunkResults.getExceptionToThrow(); - if (logger.isDebugEnabled()) { - logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Processed local result= " - + localResult + "; exceptionToThrow= " + exceptionToThrow); - } + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Processed local result= " + + localResult + "; exceptionToThrow= " + exceptionToThrow); if (exceptionToThrow != null) { throw exceptionToThrow; } @@ -100,10 +96,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator } // Return the full result - if (logger.isDebugEnabled()) { - logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Returning full result=" - + (localResult)); - } + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Returning full result=" + + (localResult)); return localResult; } @@ -172,12 +166,10 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator try { if (msg instanceof ReplyMessage) { ReplyMessage reply = (ReplyMessage) msg; - if (logger.isDebugEnabled()) { - logger - .debug("WaitUntilGatewaySenderFlushedReplyProcessor: Processing reply from sender=" - + reply.getSender() + "; returnValue=" + reply.getReturnValue() + "; exception=" - + reply.getException()); - } + logger + .debug("WaitUntilGatewaySenderFlushedReplyProcessor: Processing reply from sender=" + + reply.getSender() + "; returnValue=" + reply.getReturnValue() + "; exception=" + + reply.getException()); if (reply.getException() == null) { this.responses.put(reply.getSender(), (Boolean) reply.getReturnValue()); } else { @@ -194,10 +186,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator for (boolean singleMemberResult : this.responses.values()) { combinedResult = combinedResult && singleMemberResult; } - if (logger.isDebugEnabled()) { - logger.debug("WaitUntilGatewaySenderFlushedReplyProcessor: Returning combinedResult=" - + combinedResult); - } + logger.debug("WaitUntilGatewaySenderFlushedReplyProcessor: Returning combinedResult=" + + combinedResult); return combinedResult; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index 7adf996..242f72d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -92,9 +92,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor processors.add( new SerialGatewaySenderEventProcessor(this.sender, id + "." + i, getThreadMonitorObj(), cleanQueues)); - if (logger.isDebugEnabled()) { - logger.debug("Created the SerialGatewayEventProcessor_{}->{}", i, processors.get(i)); - } + logger.debug("Created the SerialGatewayEventProcessor_{}->{}", i, processors.get(i)); } } @@ -126,9 +124,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor public void setModifiedEventId(EntryEventImpl clonedEvent, int index) { EventID originalEventId = clonedEvent.getEventId(); - if (logger.isDebugEnabled()) { - logger.debug("The original EventId is {}", originalEventId); - } + logger.debug("The original EventId is {}", originalEventId); // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID(); // generating threadId by the algorithm explained above used to clash with // fakeThreadId generated by putAll @@ -142,14 +138,12 @@ public class ConcurrentSerialGatewaySenderEventProcessor */); EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId, originalEventId.getSequenceID()); - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}" - + ":index=" + this.sender.getEventIdIndex(), - this, clonedEvent.getKey(), index, originalEventId, - ThreadIdentifier.toDisplayString(originalEventId.getThreadID()), newEventId, - ThreadIdentifier.toDisplayString(newThreadId)); - } + logger.debug( + "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}" + + ":index=" + this.sender.getEventIdIndex(), + this, clonedEvent.getKey(), index, originalEventId, + ThreadIdentifier.toDisplayString(originalEventId.getThreadID()), newEventId, + ThreadIdentifier.toDisplayString(newThreadId)); clonedEvent.setEventId(newEventId); } @@ -276,10 +270,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor int memberIdHashCode = Arrays.hashCode(memberId); int threadIdHashCode = (int) (threadId ^ (threadId >>> 32)); eventHashCode = memberIdHashCode + threadIdHashCode; - if (logger.isDebugEnabled()) { - logger.debug("{}: Generated hashcode for event with key={}, memberId={}, threadId={}: {}", - this, event.getKey(), Arrays.toString(memberId), threadId, eventHashCode); - } + logger.debug("{}: Generated hashcode for event with key={}, memberId={}, threadId={}: {}", + this, event.getKey(), Arrays.toString(memberId), threadId, eventHashCode); break; case PARTITION: eventHashCode = PartitionRegionHelper.isPartitionedRegion(event.getRegion()) @@ -287,10 +279,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor // Get the partition for the event : event.getKey().hashCode(); // Fall back to key ordering if the region is not partitioned - if (logger.isDebugEnabled()) { - logger.debug("{}: Generated partition hashcode for event with key={}: {}", this, - event.getKey(), eventHashCode); - } + logger.debug("{}: Generated partition hashcode for event with key={}: {}", this, + event.getKey(), eventHashCode); break; } @@ -318,10 +308,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor for (Future<Boolean> f : futures) { try { boolean b = f.get(); - if (logger.isDebugEnabled()) { - logger.debug("ConcurrentSerialGatewaySenderEventProcessor: {} stopped dispatching: {}", - (b ? "Successfully" : "Unsuccessfully"), this); - } + logger.debug("ConcurrentSerialGatewaySenderEventProcessor: {} stopped dispatching: {}", + (b ? "Successfully" : "Unsuccessfully"), this); } catch (ExecutionException e) { // we don't expect any exception but if caught then eat it and log // warning @@ -337,9 +325,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor closeProcessor(); - if (logger.isDebugEnabled()) { - logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Stopped dispatching: {}", this); - } + logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Stopped dispatching: {}", this); } @Override @@ -355,9 +341,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor serialProcessor.pauseDispatching(); } super.pauseDispatching(); - if (logger.isDebugEnabled()) { - logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Paused dispatching: {}", this); - } + logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Paused dispatching: {}", this); } @Override @@ -366,9 +350,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor serialProcessor.resumeDispatching(); } super.resumeDispatching(); - if (logger.isDebugEnabled()) { - logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Resumed dispatching: {}", this); - } + logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Resumed dispatching: {}", this); } public List<SerialGatewaySenderEventProcessor> getProcessors() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 6efac4d..e22883f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -132,9 +132,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven // Create the region queue this.queue = new SerialGatewaySenderQueue(sender, regionName, listener, cleanQueues); - if (logger.isDebugEnabled()) { - logger.debug("Created queue: {}", this.queue); - } + logger.debug("Created queue: {}", this.queue); } /** @@ -202,9 +200,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven completeFailover(); } // Begin to process the message queue after becoming primary - if (logger.isDebugEnabled()) { - logger.debug("Beginning to process the message queue"); - } + logger.debug("Beginning to process the message queue"); if (!sender.isPrimary()) { logger.warn("About to process the message queue but not the primary."); @@ -371,9 +367,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven if (m != null) { for (EventWrapper ew : m.values()) { GatewaySenderEventImpl gatewayEvent = ew.event; - if (logger.isDebugEnabled()) { - logger.debug("releaseUnprocessedEvents:" + gatewayEvent); - } + logger.debug("releaseUnprocessedEvents:" + gatewayEvent); gatewayEvent.release(); } this.unprocessedEvents = null; @@ -405,7 +399,10 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven // used in the sendBatch method, and it can't be null. See EntryEventImpl // for details. GatewaySenderEventImpl senderEvent = null; - + boolean isDebugEnabled = logger.isDebugEnabled(); + if (isDebugEnabled) { + logger.debug("MLH enqueueEvent: 1 event = " + event); + } boolean isPrimary = sender.isPrimary(); if (!isPrimary) { // Fix for #40615. We need to check if we've now become the primary @@ -428,6 +425,9 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false, isLastEventInTransaction); + if (isDebugEnabled) { + logger.debug("MLH enqueueEvent: 2 handleSecondaryEvent senderEvent = " + senderEvent); + } handleSecondaryEvent(senderEvent); } } @@ -447,6 +447,9 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven boolean queuedEvent = false; try { + if (isDebugEnabled) { + logger.debug("MLH enqueueEvent: 3 queuePrimaryEvent senderEvent = " + senderEvent); + } queuedEvent = queuePrimaryEvent(senderEvent); } finally { // When queuePrimaryEvent() failed with some exception, it could @@ -455,6 +458,9 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven // and IllegalStateException could be thrown if getDeserializedValue is called // when the event is accessed through the region queue. if (!queuedEvent) { + if (isDebugEnabled) { + logger.debug("MLH enqueueEvent: 3 release senderEvent = " + senderEvent); + } GatewaySenderEventImpl.release(senderEvent); } } @@ -465,14 +471,10 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven throws IOException, CacheException { // Queue the event GatewaySenderStats statistics = this.sender.getStatistics(); - if (logger.isDebugEnabled()) { - logger.debug("{}: Queueing event ({}): {}", sender.getId(), - (statistics.getEventsQueued() + 1), gatewayEvent); - } + logger.debug("{}: Queueing event ({}): {}", sender.getId(), + (statistics.getEventsQueued() + 1), gatewayEvent); if (!sender.beforeEnqueue(gatewayEvent)) { - if (logger.isDebugEnabled()) { - logger.debug("Event {} is not added to queue.", gatewayEvent); - } + logger.debug("Event {} is not added to queue.", gatewayEvent); statistics.incEventsFiltered(); return false; } @@ -481,22 +483,16 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven try { putDone = this.queue.put(gatewayEvent); } catch (InterruptedException e) { - // Asif Not expected from SingleWriteSingleReadRegionQueue as it does not - // throw - // InterruptedException. But since both HARegionQueue and - // SingleReadSingleWriteRegionQueue + // Asif Not expected from SingleWriteSingleReadRegionQueue as it does not throw + // InterruptedException. But since both HARegionQueue and SingleReadSingleWriteRegionQueue // extend RegionQueue , it has to handle InterruptedException Thread.currentThread().interrupt(); getSender().getCancelCriterion().checkCancelInProgress(e); } statistics.endPut(start); - if (logger.isDebugEnabled()) { - logger.debug("{}: Queued event ({}): {}", sender.getId(), (statistics.getEventsQueued()), - gatewayEvent); - } - // this._logger.warning(getGateway() + ": Queued event (" + - // (statistics.getEventsQueued()) + "): " + gatewayEvent + " queue size: " - // + this._eventQueue.size()); + logger.debug("{}: Queued event ({}): {}", sender.getId(), (statistics.getEventsQueued()), + gatewayEvent); + /* * FAILOVER TESTING CODE System.out.println(getName() + ": Queued event (" + * (statistics.getEventsQueued()) + "): " + gatewayEvent.getId()); @@ -599,6 +595,11 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven */ protected void handlePrimaryDestroy(final GatewaySenderEventImpl gatewayEvent) { Executor my_executor = this.executor; + + boolean isDebugEnabled = logger.isDebugEnabled(); + if (isDebugEnabled) { + logger.debug("MLH handlePrimaryDestroy: 1 gatewayEvent = " + gatewayEvent); + } synchronized (listenerObjectLock) { if (my_executor == null) { // should mean we are now primary @@ -619,6 +620,12 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven */ protected boolean basicHandlePrimaryDestroy(final EventID eventId, boolean addToUnprocessedTokens) { + + boolean isDebugEnabled = logger.isDebugEnabled(); + if (isDebugEnabled) { + logger.debug("MLH basicHandlePrimaryDestroy: 1 eventId = " + eventId); + } + if (this.sender.isPrimary()) { // no need to do anything if we have become the primary return false; @@ -657,6 +664,10 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven } protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) { + boolean isDebugEnabled = logger.isDebugEnabled(); + if (isDebugEnabled) { + logger.debug("MLH basicHandlePrimaryEvent: 1 gatewayEvent = " + gatewayEvent); + } if (this.sender.isPrimary()) { // no need to do anything if we have become the primary return; @@ -708,6 +719,10 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven } private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent) { + boolean isDebugEnabled = logger.isDebugEnabled(); + if (isDebugEnabled) { + logger.debug("MLH basicHandleSecondaryEvent: 1 gatewayEvent = " + gatewayEvent); + } boolean freeGatewayEvent = true; try { GatewaySenderStats statistics = this.sender.getStatistics(); @@ -861,9 +876,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven @Override public void initializeEventDispatcher() { - if (logger.isDebugEnabled()) { - logger.debug(" Creating the GatewayEventCallbackDispatcher"); - } + logger.debug(" Creating the GatewayEventCallbackDispatcher"); this.dispatcher = new GatewaySenderEventCallbackDispatcher(this); } @@ -881,24 +894,18 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven destroyEvent.setEventId(dropEvent.getEventId()); destroyEvent.disallowOffHeapValues(); destroyEvent.setTailKey(-1L); - if (logger.isDebugEnabled()) { - logger.debug( - "SerialGatewaySenderEventProcessor sends BatchDestroyOperation to secondary for event {}", - destroyEvent); - } + logger.debug( + "SerialGatewaySenderEventProcessor sends BatchDestroyOperation to secondary for event {}", + destroyEvent); try { BatchDestroyOperation op = new BatchDestroyOperation(destroyEvent); op.distribute(); - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread completed destroy of dropped event {}", dropEvent); - } + logger.debug("BatchRemovalThread completed destroy of dropped event {}", dropEvent); } catch (Exception ignore) { - if (logger.isDebugEnabled()) { - logger.debug( - "Exception in sending dropped event could be ignored in order not to interrupt sender starting", - ignore); - } + logger.debug( + "Exception in sending dropped event could be ignored in order not to interrupt sender starting", + ignore); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 45f4f84..77f7368 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -232,9 +232,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { this.removalThread = new BatchRemovalThread(abstractSender.getCache()); this.removalThread.start(); this.sender = abstractSender; - if (logger.isDebugEnabled()) { - logger.debug("{}: Contains {} elements", this, size()); - } + logger.debug("{}: Contains {} elements", this, size()); } @Override @@ -273,9 +271,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { // signal that a new object is available. incrementTailKey(); - if (logger.isDebugEnabled()) { - logger.debug("{}: Inserted {} -> {}", this, key, object); - } + logger.debug("{}: Inserted {} -> {}", this, key, object); if (object instanceof Conflatable) { removeOldEntry((Conflatable) object, key); } @@ -326,11 +322,9 @@ public class SerialGatewaySenderQueue implements RegionQueue { } catch (EntryNotFoundException ok) { // this is acceptable because the conflation can remove entries // out from underneath us. - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.", - this, key); - } + logger.debug( + "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.", + this, key); } boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey; @@ -360,11 +354,9 @@ public class SerialGatewaySenderQueue implements RegionQueue { } } - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}", - this, key, this.lastDispatchedKey, this.lastDestroyedKey); - } + logger.debug( + "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}", + this, key, this.lastDispatchedKey, this.lastDestroyedKey); } /** @@ -484,11 +476,9 @@ public class SerialGatewaySenderQueue implements RegionQueue { batch.add(event); areAllEventsForTransactionInBatch = event.isLastEventInTransaction(); - if (logger.isDebugEnabled()) { - logger.debug( - "Peeking extra event: {}, isLastEventInTransaction: {}, batch size: {}", - event.getKey(), event.isLastEventInTransaction(), batch.size()); - } + logger.debug( + "Peeking extra event: {}, isLastEventInTransaction: {}, batch size: {}", + event.getKey(), event.isLastEventInTransaction(), batch.size()); } lastKeyForTransaction = eventsAndKey.lastKey; } @@ -689,10 +679,8 @@ public class SerialGatewaySenderQueue implements RegionQueue { if (index != null) { this.stats.decConflationIndexesMapSize(); } - if (logger.isDebugEnabled()) { - if (index != null) { - logger.debug("{}: Removed index {} for {}", this, index, object); - } + if (index != null) { + logger.debug("{}: Removed index {} for {}", this, index, object); } } } @@ -815,9 +803,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { } } - if (logger.isDebugEnabled()) { - logger.debug("{}: Peeked {}->{}", this, currentKey, object); - } + logger.debug("{}: Peeked {}->{}", this, currentKey, object); if (object != null) { peekedIds.add(currentKey); @@ -975,10 +961,8 @@ public class SerialGatewaySenderQueue implements RegionQueue { this.tailKey.set(inc(largestKey)); } - if (logger.isDebugEnabled()) { - logger.debug("{}: Initialized tail key to: {}, head key to: {}", this, this.tailKey, - this.headKey); - } + logger.debug("{}: Initialized tail key to: {}, head key to: {}", this, this.tailKey, + this.headKey); } } @@ -1044,10 +1028,8 @@ public class SerialGatewaySenderQueue implements RegionQueue { if (NO_ACK) { factory.setScope(Scope.DISTRIBUTED_NO_ACK); } - if (logger.isDebugEnabled()) { - logger.debug("The policy of region is {}", - (this.enablePersistence ? DataPolicy.PERSISTENT_REPLICATE : DataPolicy.REPLICATE)); - } + logger.debug("The policy of region is {}", + (this.enablePersistence ? DataPolicy.PERSISTENT_REPLICATE : DataPolicy.REPLICATE)); // Set listener if it is not null. The listener will be non-null // when the user of this queue is a secondary VM. if (listener != null) { @@ -1066,9 +1048,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { factory.setDiskSynchronous(this.isDiskSynchronous); // Create the region - if (logger.isDebugEnabled()) { - logger.debug("{}: Attempting to create queue region: {}", this, this.regionName); - } + logger.debug("{}: Attempting to create queue region: {}", this, this.regionName); final RegionAttributes<Long, AsyncEvent> ra = factory.getCreateAttributes(); try { SerialGatewaySenderQueueMetaRegion meta = @@ -1085,9 +1065,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { // Add overflow statistics to the mbean addOverflowStatisticsToMBean(gemCache, sender); - if (logger.isDebugEnabled()) { - logger.debug("{}: Created queue region: {}", this, this.region); - } + logger.debug("{}: Created queue region: {}", this, this.region); } catch (CacheException e) { logger.fatal(String.format("%s: The queue region named %s could not be created", new Object[] {this, this.regionName}), @@ -1217,10 +1195,8 @@ public class SerialGatewaySenderQueue implements RegionQueue { Thread.currentThread().interrupt(); } - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread about to send the last Dispatched key {}", - lastDispatchedKey); - } + logger.debug("BatchRemovalThread about to send the last Dispatched key {}", + lastDispatchedKey); long temp; synchronized (SerialGatewaySenderQueue.this) { @@ -1241,17 +1217,13 @@ public class SerialGatewaySenderQueue implements RegionQueue { BatchDestroyOperation op = new BatchDestroyOperation(event); op.distribute(); - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread completed destroy of keys from {} to {}", - lastDestroyedKey, temp); - } + logger.debug("BatchRemovalThread completed destroy of keys from {} to {}", + lastDestroyedKey, temp); lastDestroyedKey = temp; } // be somewhat tolerant of failures catch (CancelException e) { - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread is exiting due to cancellation"); - } + logger.debug("BatchRemovalThread is exiting due to cancellation"); break; } catch (VirtualMachineError err) { SystemFailure.initiateFailure(err); @@ -1268,16 +1240,12 @@ public class SerialGatewaySenderQueue implements RegionQueue { if (checkCancelled()) { break; } - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread: ignoring exception", t); - } + logger.debug("BatchRemovalThread: ignoring exception", t); } } // for } // ensure exit message is printed catch (CancelException e) { - if (logger.isDebugEnabled()) { - logger.debug("BatchRemovalThread exiting due to cancellation: " + e); - } + logger.debug("BatchRemovalThread exiting due to cancellation: " + e); } finally { logger.info("The QueueRemovalThread is done."); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialSecondaryGatewayListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialSecondaryGatewayListener.java index 2c6a2aa..4ede414 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialSecondaryGatewayListener.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialSecondaryGatewayListener.java @@ -84,10 +84,8 @@ public class SerialSecondaryGatewayListener extends CacheListenerAdapter { Object oldValue = event.getOldValue(); if (oldValue instanceof GatewaySenderEventImpl) { GatewaySenderEventImpl senderEvent = (GatewaySenderEventImpl) oldValue; - if (logger.isDebugEnabled()) { - logger.debug("Received after Destroy for Secondary event {} the key was {}", senderEvent, - event.getKey()); - } + logger.debug("Received after Destroy for Secondary event {} the key was {}", senderEvent, + event.getKey()); this.processor.handlePrimaryDestroy(senderEvent); } } diff --git a/geode-core/src/test/java/org/apache/geode/StatusReporterTest.java b/geode-core/src/test/java/org/apache/geode/StatusReporterTest.java new file mode 100644 index 0000000..e6dca2a --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/StatusReporterTest.java @@ -0,0 +1,105 @@ +package org.apache.geode; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.PrintStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class StatusReporterTest { + + @Before + public void setUp() throws Exception {} + + @After + public void tearDown() throws Exception {} + + @Test + public void testStart() throws InterruptedException { + StatusReporter.start(); + while (StatusReporter.getStatus() != StatusReporter.State.SR_RUNNING) { + Thread.sleep(1000); + } + assertThat(StatusReporter.getStatus()).isEqualTo(StatusReporter.State.SR_RUNNING); + } + + @Test + public void testRegisterComponent() { + PrintStream printStream = Mockito.mock(PrintStream.class); + ComponentStatus componentStatus = new ComponentStatus() { + @Override + public String name() { + return "Test Component"; + } + + @Override + public String getStatusString() { + return "Everything is fine and happy."; + } + + @Override + public void print() { + printStream.println(getStatusString()); + } + }; + + StatusReporter.registerComponent(componentStatus); + assertThat(StatusReporter.isComponentRegistered(componentStatus)).isTrue(); + StatusReporter.removeComponent(componentStatus); + } + + @Test + public void testGetStatusReport() { + PrintStream printStream = Mockito.mock(PrintStream.class); + ComponentStatus componentStatus = new ComponentStatus() { + @Override + public String name() { + return "Test Component"; + } + + @Override + public String getStatusString() { + return "Everything is fine and happy."; + } + + @Override + public void print() { + printStream.println(getStatusString()); + } + }; + + StatusReporter.registerComponent(componentStatus); + assertThat(StatusReporter.getStatusReport()).isTrue(); + Mockito.verify(printStream).println(componentStatus.getStatusString()); + StatusReporter.removeComponent(componentStatus); + } + + @Test + public void testPeriodicReports() throws InterruptedException { + PrintStream printStream = Mockito.mock(PrintStream.class); + ComponentStatus componentStatus = new ComponentStatus() { + @Override + public String name() { + return "Test Component"; + } + + @Override + public String getStatusString() { + return "Everything is fine and happy."; + } + + @Override + public void print() { + printStream.println(getStatusString()); + } + }; + + StatusReporter.registerComponent(componentStatus); + Thread.sleep(5000); + Mockito.verify(printStream).println(componentStatus.getStatusString()); + StatusReporter.removeComponent(componentStatus); + } +} diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java index 97436a2..c7f934e 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java @@ -18,6 +18,8 @@ import java.util.Set; import org.apache.logging.log4j.Logger; +import org.apache.geode.ComponentStatus; +import org.apache.geode.StatusReporter; import org.apache.geode.cache.asyncqueue.AsyncEventListener; import org.apache.geode.cache.wan.GatewayTransportFilter; import org.apache.geode.distributed.DistributedLockService; @@ -43,7 +45,8 @@ import org.apache.geode.logging.internal.log4j.api.LogService; /** * @since GemFire 7.0 */ -public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { +public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender implements + ComponentStatus { private static final Logger logger = LogService.getLogger(); @@ -63,6 +66,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { } private void start(boolean cleanQueues) { + StatusReporter.registerComponent(this); if (logger.isDebugEnabled()) { logger.debug("Starting gatewaySender : {}", this); } @@ -110,6 +114,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { logger .info("Started {}", this); + status = "Started"; enqueueTempEvents(); } finally { this.getLifeCycleLock().writeLock().unlock(); @@ -262,4 +267,24 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { return null; } } + + @Override + public String name() { + return "SerialGatewaySender"; + } + + private String status = "Started"; + + @Override + public String getStatusString() { + synchronized (this) { + return status; + } + } + + @Override + public void print() { + String status = getStatusString(); + System.out.println("Component name: " + name() + " status: " + status + " " + toString()); + } }