Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-77 e0d1c4f90 -> 26fea7a16
GEODE-77 added Messenger statistics and removed old JGroups statistics This also fixes a few bugs that I found during testing. GMSMember wasn't serializing correctly in some cases. I also found that gfsh has a showDeadlock command and hooked in the new findDeepestGraph DependencyGraph search. If gfsh can't find a deadlock it will now report on the deepest call chain it can find, which often points to the source of a problem in the distributed system. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/26fea7a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/26fea7a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/26fea7a1 Branch: refs/heads/feature/GEODE-77 Commit: 26fea7a161f2336aa5577b0f4b2126d11c9ae37f Parents: e0d1c4f Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Tue Aug 18 09:37:05 2015 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Tue Aug 18 09:37:05 2015 -0700 ---------------------------------------------------------------------- .../gemfire/distributed/internal/DMStats.java | 123 +------ .../internal/DistributionManager.java | 3 + .../distributed/internal/DistributionStats.java | 333 +------------------ .../internal/LonerDistributionManager.java | 210 ++++++++---- .../internal/deadlock/DeadlockDetector.java | 14 +- .../internal/deadlock/DependencyGraph.java | 9 +- .../membership/InternalDistributedMember.java | 1 + .../internal/membership/gms/GMSMember.java | 10 +- .../membership/gms/fd/GMSHealthMonitor.java | 4 + .../gms/interfaces/HealthMonitor.java | 7 +- .../membership/gms/interfaces/JoinLeave.java | 7 + .../membership/gms/interfaces/Manager.java | 61 +++- .../membership/gms/membership/GMSJoinLeave.java | 56 +++- .../gms/messenger/JGroupsMessenger.java | 27 +- .../membership/gms/messenger/StatRecorder.java | 107 ++++++ .../gms/mgr/GMSMembershipManager.java | 8 + .../internal/tcpserver/TcpServer.java | 2 +- .../cli/commands/MiscellaneousCommands.java | 16 +- .../internal/cli/i18n/CliStrings.java | 1 + .../distributed/internal/jgroups-config.xml | 1 + .../distributed/internal/jgroups-mcast.xml | 1 + .../gms/membership/StatRecorderJUnitTest.java | 181 ++++++++++ 22 files changed, 634 insertions(+), 548 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java index d25d107..5f9c8f1 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java @@ -172,38 +172,26 @@ public interface DMStats { public long startSocketWrite(boolean sync); public void endSocketWrite(boolean sync, long start, int bytesWritten, int retries); /** - * begin a unicast datagram write operation. Use the result of this operation - * when calling endUcastWrite + * increments + * the number of unicast writes performed and the number of bytes written * @since 5.0 */ - public long startUcastWrite(); - /** - * record the end of a unicast datagram write operation and increment - * the number of writes performed and the number of bytes written - * @since 5.0 - */ - public void endUcastWrite(long start, int bytesWritten); + public void incUcastWriteBytes(int bytesWritten); /** * increment the number of unicast datagram payload bytes received and * the number of unicast reads performed */ - public void incUcastReadBytes(long amount); - /** - * begin a multicast write operation. Use the result of this operation - * when calling endMcastWrite - * @since 5.0 - */ - public long startMcastWrite(); + public void incUcastReadBytes(int amount); /** - * record the end of a multicast datagram write operation - * @since 5.0 + * increment the number of multicast datagrams sent and + * the number of multicast bytes transmitted */ - public void endMcastWrite(long start, int bytesWritten); + public void incMcastWriteBytes(int bytesWritten); /** * increment the number of multicast datagram payload bytes received, and * the number of mcast messages read */ - public void incMcastReadBytes(long amount); + public void incMcastReadBytes(int amount); /** * returns the current value of the mcastWrites statistic */ @@ -285,7 +273,7 @@ public interface DMStats { */ public int getSendersSU(); /** - * increment the number of unicast UDP retransmissions sent to + * increment the number of unicast UDP retransmission requests received from * other processes * @since 5.0 */ @@ -309,97 +297,6 @@ public interface DMStats { public void incMcastRetransmitRequests(); /** - * start a period of suspension of message transmission while we - * wait for acknowledgement of unicast messages this process has - * transmitted to other processes. This returns a timestamp to be - * used when calling endUcastFlush() - * @since 5.0 - */ - public long startUcastFlush(); - - /** - * end a period of suspension of message transmission while waiting - * for acknowledgment of unicast messages - * @since 5.0 - */ - public void endUcastFlush(long start); - - /** - * increment the number of flow control requests sent to other processes - */ - public void incFlowControlRequests(); - - /** - * increment the number of flow control responses sent to other processes - */ - public void incFlowControlResponses(); - - /** - * start a period of suspension of message transmission while waiting - * for flow-control recharge from another process. This returns a - * timestamp to be used when calling endFlowControlWait(); - * @since 5.0 - */ - public long startFlowControlWait(); - - /** - * end a period of suspension of message transmission while waiting for - * flow-control recharge from another process. - */ - public void endFlowControlWait(long start); - - /** - * start a period of suspension of message transmission based on throttle - * request from another process. - * This returns a timestamp to be used when calling endFlowControlWait(); - * @since 5.0 - */ - public long startFlowControlThrottleWait(); - - /** - * end a period of suspension of message transmission based on throttle - * request from another process. - */ - public void endFlowControlThrottleWait(long start); - - /** - * this statistic measures travel of messages up the jgroups stack - * for tuning purposes - */ - public void incJgUNICASTdataReceived(long value); - - public void incjgDownTime(long value); - public void incjgUpTime(long value); - public void incjChannelUpTime(long value); - - public void setJgQueuedMessagesSize(long value); - - public void setJgSTABLEreceivedMessagesSize(long value); - public void setJgSTABLEsentMessagesSize(long value); - - public void incJgSTABLEsuspendTime(long value); - public void incJgSTABLEmessages(long value); - public void incJgSTABLEmessagesSent(long value); - public void incJgSTABILITYmessages(long value); - - public void incJgFCsendBlocks(long value); - public void incJgFCautoRequests(long value); - public void incJgFCreplenish(long value); - public void incJgFCresumes(long value); - public void incJgFCsentCredits(long value); - public void incJgFCsentThrottleRequests(long value); - - public void setJgUNICASTreceivedMessagesSize(long amount); - public void setJgUNICASTsentMessagesSize(long amount); - public void setJgUNICASTsentHighPriorityMessagesSize(long amount); - - /** increment the number of javagroups fragmentations performed */ - public void incJgFragmentationsPerformed(); - - /** increment the number of fragments created during javagroups fragmentation */ - public void incJgFragmentsCreated(long value); - - /** * @since 4.2.2 */ public int getAsyncSocketWritesInProgress(); @@ -574,8 +471,6 @@ public interface DMStats { */ public void endBufferAcquire(long start); - public void incJgNAKACKwaits(long value); - /** * increment/decrement the number of thread-owned receivers with the given * domino count http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java index 33fd9db..e8fe361 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java @@ -628,6 +628,9 @@ public class DistributionManager }; logger.info(LogMarker.DM, LocalizedMessage.create( LocalizedStrings.DistributionManager_DISTRIBUTIONMANAGER_0_STARTED_ON_1_THERE_WERE_2_OTHER_DMS_3_4_5, logArgs)); + +logger.info("My ID is {}", Integer.toHexString(System.identityHashCode(dm.getDistributionManagerId()))); + MembershipLogger.logStartup(dm.getDistributionManagerId()); } return dm; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java index b64aca1..e8c8073 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java @@ -14,7 +14,6 @@ import com.gemstone.gemfire.Statistics; import com.gemstone.gemfire.StatisticsFactory; import com.gemstone.gemfire.StatisticsType; import com.gemstone.gemfire.StatisticsTypeFactory; -import com.gemstone.gemfire.i18n.LogWriterI18n; import com.gemstone.gemfire.internal.NanoTimer; import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl; import com.gemstone.gemfire.internal.logging.LogService; @@ -104,14 +103,12 @@ public class DistributionStats implements DMStats { private final static int ucastReadsId; private final static int ucastReadBytesId; - private final static int ucastWriteTimeId; private final static int ucastWritesId; private final static int ucastWriteBytesId; private final static int ucastRetransmitsId; private final static int mcastReadsId; private final static int mcastReadBytesId; - private final static int mcastWriteTimeId; private final static int mcastWritesId; private final static int mcastWriteBytesId; private final static int mcastRetransmitsId; @@ -141,47 +138,9 @@ public class DistributionStats implements DMStats { private final static int batchWaitTimeId; private final static int batchFlushTimeId; - private final static int ucastFlushesId; - private final static int ucastFlushTimeId; - - private final static int flowControlWaitsInProgressId; - private final static int flowControlThrottleWaitsInProgressId; - private final static int flowControlRequestsId; - private final static int flowControlResponsesId; - - private final static int jgUNICASTdataReceivedTimeId; - - private final static int jgReceivedMessagesSizeId; - private final static int jgQueuedMessagesSizeId; - private final static int jgSentMessagesPoolSizeId; - - private final static int jgUcastReceivedMessagesSizeId; - private final static int jgUcastSentMessagesSizeId; - private final static int jgUcastSentHighPriorityMessagesSizeId; - - private final static int jgSTABLEsuspendTimeId; - private final static int jgSTABLEmessagesId; - private final static int jgSTABLEmessagesSentId; - private final static int jgSTABILITYmessagesId; - - private final static int jgDownTimeId; - private final static int jgUpTimeId; - private final static int jChannelUpTimeId; - - private final static int jgFCsendBlocksId; - private final static int jgFCautoRequestsId; - private final static int jgFCreplenishId; - private final static int jgFCresumesId; - private final static int jgFCsentCreditsId; - private final static int jgFCsentThrottleRequestsId; - private final static int jgNAKACKwaitsId; - private final static int threadOwnedReceiversId; private final static int threadOwnedReceiversId2; - private final static int jgFragmentationsPerformedId; - private final static int jgFragmentsCreatedId; - private final static int asyncSocketWritesInProgressId; private final static int asyncSocketWritesId; private final static int asyncSocketWriteRetriesId; @@ -419,14 +378,12 @@ public class DistributionStats implements DMStats { f.createIntCounter("ucastReads", "Total number of unicast datagrams received", "datagrams"), f.createLongCounter("ucastReadBytes", "Total number of bytes received in unicast datagrams", "bytes"), - f.createLongCounter("ucastWriteTime", "Total amount of time, in nanoseconds, spent in unicast datagram socket write calls.", "nanoseconds"), f.createIntCounter("ucastWrites", "Total number of unicast datagram socket write calls.", "writes"), f.createLongCounter("ucastWriteBytes", "Total number of bytes sent out on unicast datagram sockets.", "bytes"), f.createIntCounter("ucastRetransmits", "Total number of unicast datagram socket retransmissions", "writes"), f.createIntCounter("mcastReads", "Total number of multicast datagrams received", "datagrams"), f.createLongCounter("mcastReadBytes", "Total number of bytes received in multicast datagrams", "bytes"), - f.createLongCounter("mcastWriteTime", "Total amount of time, in nanoseconds, spent in multicast datagram socket write calls.", "nanoseconds"), f.createIntCounter("mcastWrites", "Total number of multicast datagram socket write calls.", "writes"), f.createLongCounter("mcastWriteBytes", "Total number of bytes sent out on multicast datagram sockets.", "bytes"), f.createIntCounter("mcastRetransmits", "Total number of multicast datagram socket retransmissions", "writes"), @@ -453,67 +410,6 @@ public class DistributionStats implements DMStats { f.createLongCounter("batchCopyTime", "Total amount of time, in nanoseconds, spent copying messages for batched transmission", "nanoseconds"), f.createLongCounter("batchFlushTime", "Total amount of time, in nanoseconds, spent flushing batched messages to the network", "nanoseconds"), - f.createIntCounter("ucastFlushes", "Total number of flushes of the unicast datagram protocol, prior to sending a multicast message", "flushes"), - f.createLongCounter("ucastFlushTime", "Total amount of time, in nanoseconds, spent waiting for acknowledgements for outstanding unicast datagram messages", "nanoseconds"), - - f.createIntCounter("flowControlRequests", "Total number of flow control credit requests sent to other processes", "messages"), - f.createIntCounter("flowControlResponses", "Total number of flow control credit responses sent to a requestor", "messages"), - f.createIntGauge("flowControlWaitsInProgress", "Number of threads blocked waiting for flow-control recharges from other processes", "threads"), - f.createLongCounter("flowControlWaitTime", "Total amount of time, in nanoseconds, spent waiting for other processes to recharge the flow of control meter", "nanoseconds"), - f.createIntGauge("flowControlThrottleWaitsInProgress", "Number of threads blocked waiting due to flow-control throttle requests from other members", "threads"), - - f.createLongGauge("jgNAKACKreceivedMessages", "Number of received messages awaiting stability in NAKACK", "messages"), - f.createLongGauge("jgNAKACKsentMessages", "Number of sent messages awaiting stability in NAKACK", "messages"), - - f.createLongGauge("jgQueuedMessages", "Number of messages queued by transport and awaiting processing", "messages"), - - f.createLongGauge("jgUNICASTreceivedMessages", "Number of received messages awaiting receipt of prior messages", "messages"), - f.createLongGauge("jgUNICASTsentMessages", "Number of un-acked normal priority messages", "messages"), - f.createLongGauge("jgUNICASTsentHighPriorityMessages", "Number of un-acked high priority messages", "messages"), - - f.createLongCounter("jgUNICASTdataReceivedTime", "Amount of time spent in JGroups UNICAST send", "nanoseconds"), - f.createLongCounter("jgSTABLEsuspendTime", "Amount of time JGroups STABLE is suspended", "nanoseconds"), - f.createLongCounter("jgSTABLEmessages", "Number of STABLE messages received by JGroups", "messages"), - f.createLongCounter("jgSTABLEmessagesSent", "Number of STABLE messages sent by JGroups", "messages"), - f.createLongCounter("jgSTABILITYmessages", "Number of STABILITY messages received by JGroups", "messages"), - -// f.createLongCounter("jgUDPupTime", "Time spent in JGroups UDP processing up events", "nanoseconds"), -// f.createLongCounter("jgUDPdownTime", "Time spent in JGroups UDP processing down events", "nanoseconds"), -// f.createLongCounter("jgNAKACKupTime", "Time spent in JGroups NAKACK processing up events", "nanoseconds"), -// f.createLongCounter("jgNAKACKdownTime", "Time spent in JGroups NAKACK processing down events", "nanoseconds"), -// f.createLongCounter("jgUNICASTupTime", "Time spent in JGroups UNICAST processing up events", "nanoseconds"), -// f.createLongCounter("jgUNICASTdownTime", "Time spent in JGroups UNICAST processing down events", "nanoseconds"), -// f.createLongCounter("jgSTABLEupTime", "Time spent in JGroups STABLE processing up events", "nanoseconds"), -// f.createLongCounter("jgSTABLEdownTime", "Time spent in JGroups STABLE processing down events", "nanoseconds"), -// f.createLongCounter("jgFRAG2upTime", "Time spent in JGroups FRAG2 processing up events", "nanoseconds"), -// f.createLongCounter("jgFRAG2downTime", "Time spent in JGroups FRAG2 processing down events", "nanoseconds"), -// f.createLongCounter("jgGMSupTime", "Time spent in JGroups GMS processing up events", "nanoseconds"), -// f.createLongCounter("jgGMSdownTime", "Time spent in JGroups GMS processing down events", "nanoseconds"), -// f.createLongCounter("jgFCupTime", "Time spent in JGroups FC processing up events", "nanoseconds"), -// f.createLongCounter("jgFCdownTime", "Time spent in JGroups FC processing down events", "nanoseconds"), -// f.createLongCounter("jgDirAckupTime", "Time spent in JGroups DirAck processing up events", "nanoseconds"), -// f.createLongCounter("jgDirAckdownTime", "Time spent in JGroups DirAck processing down events", "nanoseconds"), -// -// f.createLongCounter("jgVIEWSYNCdownTime", "Time spent in JGroups VIEWSYNC processing down events", "nanoseconds"), -// f.createLongCounter("jgVIEWSYNCupTime", "Time spent in JGroups VIEWSYNC processing up events", "nanoseconds"), -// f.createLongCounter("jgFDdownTime", "Time spent in JGroups FD processing down events", "nanoseconds"), -// f.createLongCounter("jgFDupTime", "Time spent in JGroups FD processing up events", "nanoseconds"), -// f.createLongCounter("jgTCPGOSSIPdownTime", "Time spent in JGroups TCPGOSSIP processing down events", "nanoseconds"), -// f.createLongCounter("jgTCPGOSSIPupTime", "Time spent in JGroups TCPGOSSIP processing up events", "nanoseconds"), -// f.createLongCounter("jgDISCOVERYdownTime", "Time spent in JGroups DISCOVERY processing down events", "nanoseconds"), -// f.createLongCounter("jgDISCOVERYupTime", "Time spent in JGroups DISCOVERY processing up events", "nanoseconds"), - - f.createLongCounter("jgDownTime", "Down Time spent in JGroups stacks", "nanoseconds"), - f.createLongCounter("jgUpTime", "Up Time spent in JGroups stacks", "nanoseconds"), - f.createLongCounter("jChannelUpTime", "Up Time spent in JChannel including jgroup stack", "nanoseconds"), - - f.createLongCounter("jgFCsendBlocks", "Number of times JGroups FC halted sends due to backpressure", "events"), - f.createLongCounter("jgFCautoRequests", "Number of times JGroups FC automatically sent replenishment requests", "events"), - f.createLongCounter("jgFCreplenish", "Number of times JGroups FC received replenishments from receivers", "messages"), - f.createLongCounter("jgFCresumes", "Number of times JGroups FC resumed sends due to backpressure", "events"), - f.createLongCounter("jgFCsentCredits", "Number of times JGroups FC sent credits to a sender", "events"), - f.createLongCounter("jgFCsentThrottleRequests","Number of times JGroups FC sent throttle requests to a sender", "events"), - f.createIntGauge("asyncSocketWritesInProgress", "Current number of non-blocking socket write calls in progress.", "writes"), f.createIntCounter("asyncSocketWrites", "Total number of non-blocking socket write calls completed.", "writes"), f.createIntCounter("asyncSocketWriteRetries", "Total number of retries needed to write a single block of data using non-blocking socket write calls.", "writes"), @@ -539,14 +435,10 @@ public class DistributionStats implements DMStats { f.createIntGauge("asyncThreadInProgress", asyncThreadInProgressDesc, "operations"), f.createIntCounter("asyncThreadCompleted", asyncThreadCompletedDesc, "operations"), f.createLongCounter("asyncThreadTime", asyncThreadTimeDesc, "nanoseconds", false), - f.createLongCounter("jgNAKACKwaits", "Number of delays created by NAKACK sent_msgs overflow", "events"), f.createLongGauge("receiversTO", "Number of receiver threads owned by non-receiver threads in other members.", "threads"), f.createLongGauge("receiversTO2", "Number of receiver threads owned in turn by receiver threads in other members", "threads"), - f.createLongCounter("jgFragmentationsPerformed", "Number of message fragmentation operations performed", "operations"), - f.createLongCounter("jgFragmentsCreated", "Number of message fragments created", "fragments"), - f.createLongGauge("receiverDirectBufferSize", receiverDirectBufferSizeDesc, "bytes"), f.createLongGauge("receiverHeapBufferSize", receiverHeapBufferSizeDesc, "bytes"), f.createLongGauge("senderDirectBufferSize", senderDirectBufferSizeDesc, "bytes"), @@ -653,14 +545,12 @@ public class DistributionStats implements DMStats { ucastReadsId = type.nameToId("ucastReads"); ucastReadBytesId = type.nameToId("ucastReadBytes"); - ucastWriteTimeId = type.nameToId("ucastWriteTime"); ucastWritesId = type.nameToId("ucastWrites"); ucastWriteBytesId = type.nameToId("ucastWriteBytes"); ucastRetransmitsId = type.nameToId("ucastRetransmits"); mcastReadsId = type.nameToId("mcastReads"); mcastReadBytesId = type.nameToId("mcastReadBytes"); - mcastWriteTimeId = type.nameToId("mcastWriteTime"); mcastWritesId = type.nameToId("mcastWrites"); mcastWriteBytesId = type.nameToId("mcastWriteBytes"); mcastRetransmitsId = type.nameToId("mcastRetransmits"); @@ -688,41 +578,6 @@ public class DistributionStats implements DMStats { batchWaitTimeId = type.nameToId("batchWaitTime"); batchFlushTimeId = type.nameToId("batchFlushTime"); - ucastFlushesId = type.nameToId("ucastFlushes"); - ucastFlushTimeId = type.nameToId("ucastFlushTime"); - - flowControlRequestsId = type.nameToId("flowControlRequests"); - flowControlResponsesId = type.nameToId("flowControlResponses"); - flowControlWaitsInProgressId = type.nameToId("flowControlWaitsInProgress"); - flowControlThrottleWaitsInProgressId = type.nameToId("flowControlThrottleWaitsInProgress"); - - jgUNICASTdataReceivedTimeId = type.nameToId("jgUNICASTdataReceivedTime"); - - jgReceivedMessagesSizeId = type.nameToId("jgNAKACKreceivedMessages"); - jgSentMessagesPoolSizeId = type.nameToId("jgNAKACKsentMessages"); - jgQueuedMessagesSizeId = type.nameToId("jgQueuedMessages"); - jgSTABLEsuspendTimeId = type.nameToId("jgSTABLEsuspendTime"); - jgSTABLEmessagesId = type.nameToId("jgSTABLEmessages"); - jgSTABLEmessagesSentId = type.nameToId("jgSTABLEmessagesSent"); - jgSTABILITYmessagesId = type.nameToId("jgSTABILITYmessages"); - jgFragmentationsPerformedId = type.nameToId("jgFragmentationsPerformed"); - jgFragmentsCreatedId = type.nameToId("jgFragmentsCreated"); - - jgUcastReceivedMessagesSizeId = type.nameToId("jgUNICASTreceivedMessages"); - jgUcastSentMessagesSizeId = type.nameToId("jgUNICASTsentMessages"); - jgUcastSentHighPriorityMessagesSizeId = type.nameToId("jgUNICASTsentHighPriorityMessages"); - - jgDownTimeId = type.nameToId("jgDownTime"); - jgUpTimeId = type.nameToId("jgUpTime"); - jChannelUpTimeId = type.nameToId("jChannelUpTime"); - - jgFCsendBlocksId = type.nameToId("jgFCsendBlocks"); - jgFCautoRequestsId = type.nameToId("jgFCautoRequests"); - jgFCreplenishId = type.nameToId("jgFCreplenish"); - jgFCresumesId = type.nameToId("jgFCresumes"); - jgFCsentCreditsId = type.nameToId("jgFCsentCredits"); - jgFCsentThrottleRequestsId = type.nameToId("jgFCsentThrottleRequests"); - asyncSocketWritesInProgressId = type.nameToId("asyncSocketWritesInProgress"); asyncSocketWritesId = type.nameToId("asyncSocketWrites"); asyncSocketWriteRetriesId = type.nameToId("asyncSocketWriteRetries"); @@ -749,8 +604,6 @@ public class DistributionStats implements DMStats { asyncThreadCompletedId = type.nameToId("asyncThreadCompleted"); asyncThreadTimeId = type.nameToId("asyncThreadTime"); - jgNAKACKwaitsId = type.nameToId("jgNAKACKwaits"); - threadOwnedReceiversId = type.nameToId("receiversTO"); threadOwnedReceiversId2 = type.nameToId("receiversTO2"); @@ -1219,36 +1072,24 @@ public class DistributionStats implements DMStats { stats.incLong(bufferAcquireTimeId, ts-start); } - public long startUcastWrite() { - return getStatTime(); - } - public void endUcastWrite(long start, int bytesWritten) { - if (enableClockStats) { - stats.incLong(ucastWriteTimeId, getStatTime()-start); - } + public void incUcastWriteBytes(int bytesWritten) { stats.incInt(ucastWritesId, 1); stats.incLong(ucastWriteBytesId, bytesWritten); } - public long startMcastWrite() { - return getStatTime(); - } - public void endMcastWrite(long start, int bytesWritten) { + public void incMcastWriteBytes(int bytesWritten) { stats.incInt(mcastWritesId, 1); - if (enableClockStats) { - stats.incLong(mcastWriteTimeId, getStatTime()-start); - } stats.incLong(mcastWriteBytesId, bytesWritten); } public int getMcastWrites() { return stats.getInt(mcastWritesId); } - public void incMcastReadBytes(long amount) { + public void incMcastReadBytes(int amount) { stats.incInt(mcastReadsId, 1); stats.incLong(mcastReadBytesId, amount); } - public void incUcastReadBytes(long amount) { + public void incUcastReadBytes(int amount) { stats.incInt(ucastReadsId, 1); stats.incLong(ucastReadBytesId, amount); } @@ -1782,172 +1623,6 @@ public class DistributionStats implements DMStats { public int getMcastRetransmits() { return stats.getInt(mcastRetransmitsId); } - public long startUcastFlush() { - stats.incInt(ucastFlushesId, 1); - return getStatTime(); - } - public void endUcastFlush(long start) { - if (enableClockStats) { - stats.incLong(ucastFlushTimeId, getStatTime()-start); - } - } - public void incFlowControlRequests() { - stats.incInt(flowControlRequestsId, 1); - } - public void incFlowControlResponses() { - stats.incInt(flowControlResponsesId, 1); - } - public long startFlowControlWait() { - stats.incInt(flowControlWaitsInProgressId, 1); - return 1; - } - public void endFlowControlWait(long start) { - stats.incInt(flowControlWaitsInProgressId, -1); - } - public long startFlowControlThrottleWait() { - stats.incInt(flowControlThrottleWaitsInProgressId, 1); - return 1; - } - - public void endFlowControlThrottleWait(long start) { - stats.incInt(flowControlThrottleWaitsInProgressId, -1); - } - public void incJgUNICASTdataReceived(long amount) { - stats.incLong(jgUNICASTdataReceivedTimeId, amount); - } - -// public void incjgUDPup(long amount) { -// stats.incLong(jgUDPupId, amount); -// } -// public void incjgUDPdown(long amount) { -// stats.incLong(jgUDPdownId, amount); -// } -// public void incjgNAKACKup(long amount) { -// stats.incLong(jgNAKACKupId, amount); -// } -// public void incjgNAKACKdown(long amount) { -// stats.incLong(jgNAKACKdownId, amount); -// } -// public void incjgUNICASTup(long amount) { -// stats.incLong(jgUNICASTupId, amount); -// } -// public void incjgUNICASTdown(long amount) { -// stats.incLong(jgUNICASTdownId, amount); -// } -// public void incjgSTABLEup(long amount) { -// stats.incLong(jgSTABLEupId, amount); -// } -// public void incjgSTABLEdown(long amount) { -// stats.incLong(jgSTABLEdownId, amount); -// } -// public void incjgFRAG2up(long amount) { -// stats.incLong(jgFRAG2upId, amount); -// } -// public void incjgFRAG2down(long amount) { -// stats.incLong(jgFRAG2downId, amount); -// } -// public void incjgGMSup(long amount) { -// stats.incLong(jgGMSupId, amount); -// } -// public void incjgGMSdown(long amount) { -// stats.incLong(jgGMSdownId, amount); -// } -// public void incjgFCup(long amount) { -// stats.incLong(jgFCupId, amount); -// } -// public void incjgFCdown(long amount) { -// stats.incLong(jgFCdownId, amount); -// } - - public void setJgQueuedMessagesSize(long amount) { - stats.setLong(jgQueuedMessagesSizeId, amount); - } - - public void setJgSTABLEreceivedMessagesSize(long amount) { - stats.setLong(jgReceivedMessagesSizeId, amount); - } - public void setJgSTABLEsentMessagesSize(long amount) { - stats.setLong(jgSentMessagesPoolSizeId, amount); - } - public void incJgSTABLEsuspendTime(long value) - { - stats.incLong(jgSTABLEsuspendTimeId, value); - } - public void incJgSTABLEmessages(long value) - { - stats.incLong(jgSTABLEmessagesId, value); - } - public void incJgSTABLEmessagesSent(long value) - { - stats.incLong(jgSTABLEmessagesSentId, value); - } - public void incJgSTABILITYmessages(long value) - { - stats.incLong(jgSTABILITYmessagesId, value); - } - public void incJgFragmentationsPerformed() { - stats.incLong(jgFragmentationsPerformedId, 1); - } - public void incJgFragmentsCreated(long numFrags) { - stats.incLong(jgFragmentsCreatedId, numFrags); - } - public void setJgUNICASTreceivedMessagesSize(long amount) { - stats.setLong(jgUcastReceivedMessagesSizeId, amount); - } - public void setJgUNICASTsentMessagesSize(long amount) { - stats.setLong(jgUcastSentMessagesSizeId, amount); - } - public void setJgUNICASTsentHighPriorityMessagesSize(long amount) { - stats.setLong(jgUcastSentHighPriorityMessagesSizeId, amount); - } - - - public void incjgDownTime(long value) - { - stats.incLong(jgDownTimeId, value); - } - - public void incjgUpTime(long value) - { - stats.incLong(jgUpTimeId, value); - } - - public void incjChannelUpTime(long value) - { - stats.incLong(jChannelUpTimeId, value); - } - - public void incJgFCsendBlocks(long value) - { - stats.incLong(jgFCsendBlocksId, value); - - } - public void incJgFCautoRequests(long value) - { - stats.incLong(jgFCautoRequestsId, value); - - } - public void incJgFCreplenish(long value) - { - stats.incLong(jgFCreplenishId, value); - - } - public void incJgFCresumes(long value) - { - stats.incLong(jgFCresumesId, value); - } - public void incJgFCsentCredits(long value) - { - stats.incLong(jgFCsentCreditsId, value); - } - public void incJgFCsentThrottleRequests(long value) - { - stats.incLong(jgFCsentThrottleRequestsId, value); - } - - public void incJgNAKACKwaits(long value) { - stats.incLong(jgNAKACKwaitsId, value); - } public void incThreadOwnedReceivers(long value, int dominoCount) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java index 8b70104..aae0865 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java @@ -291,205 +291,277 @@ public class LonerDistributionManager implements DM { public Set getAdminMemberSet(){ return Collections.EMPTY_SET; } public static class DummyDMStats implements DMStats { + @Override public long getSentMessages() {return 0;} + @Override public void incSentMessages(long messages) {} + @Override public void incTOSentMsg() {} + @Override public long getSentCommitMessages() {return 0;} + @Override public void incSentCommitMessages(long messages) {} + @Override public long getCommitWaits() {return 0;} + @Override public void incCommitWaits() {} + @Override public long getSentMessagesTime() {return 0;} + @Override public void incSentMessagesTime(long nanos) {} + @Override public long getBroadcastMessages() {return 0;} + @Override public void incBroadcastMessages(long messages) {} + @Override public long getBroadcastMessagesTime() {return 0;} + @Override public void incBroadcastMessagesTime(long nanos) {} + @Override public long getReceivedMessages() {return 0;} + @Override public void incReceivedMessages(long messages) {} + @Override public long getReceivedBytes() {return 0;} + @Override public void incReceivedBytes(long bytes) {} + @Override public void incSentBytes(long bytes) {} + @Override public long getProcessedMessages() {return 0;} + @Override public void incProcessedMessages(long messages) {} + @Override public long getProcessedMessagesTime() {return 0;} + @Override public void incProcessedMessagesTime(long nanos) {} + @Override public long getMessageProcessingScheduleTime() {return 0;} - public int getDLockWaitsInProgress() {return 0;} - public int getDLockWaitsCompleted() {return 0;} - public long getDLockWaitTime() {return 0;} - public long startDLockWait() {return 0;} - public void endDLockWait(long start, boolean gotit) {} - public void incDLockVetosSent(int ops) {} - public void incDLockVetosReceived(int ops) {} - public void incDLockYesVotesSent(int ops) {} - public void incDLockYesVotesReceived(int ops) {} - public void incDLockNoVotesSent(int ops) {} - public void incDLockNoVotesReceived(int ops) {} - public void incDLockAbstainsSent(int ops) {} - public void incDLockAbstainsReceived(int ops) {} + @Override public void incMessageProcessingScheduleTime(long nanos) {} + @Override public int getOverflowQueueSize() {return 0;} + @Override public void incOverflowQueueSize(int messages) {} + @Override public int getNumProcessingThreads() {return 0;} + @Override public void incNumProcessingThreads(int threads) {} + @Override public int getNumSerialThreads() {return 0;} + @Override public void incNumSerialThreads(int threads) {} + @Override public void incMessageChannelTime(long val) {} + @Override public long getReplyMessageTime() {return 0;} + @Override public void incReplyMessageTime(long val) {} + @Override public long getDistributeMessageTime() {return 0;} + @Override public void incDistributeMessageTime(long val) {} + @Override public int getNodes() {return 0;} + @Override public void setNodes(int val) {} + @Override public void incNodes(int val) {} + @Override public int getReplyWaitsInProgress() {return 0;} + @Override public int getReplyWaitsCompleted() {return 0;} + @Override public long getReplyWaitTime() {return 0;} + @Override public long startReplyWait() {return 0;} + @Override public void endReplyWait(long startNanos, long startMillis) {} + @Override public void incReplyTimeouts() { } + @Override public long getReplyTimeouts() { return 0; } + @Override public void incReceivers() {} + @Override public void decReceivers() {} + @Override public void incFailedAccept() {} + @Override public void incFailedConnect() {} + @Override public void incReconnectAttempts() {} + @Override public void incLostLease() {} + @Override public void incSenders(boolean shared, boolean preserveOrder) {} + @Override public void decSenders(boolean shared, boolean preserveOrder) {} + @Override public int getSendersSU() { return 0; } + @Override public long startSocketWrite(boolean sync) {return 0; } + @Override public void endSocketWrite(boolean sync, long start, int bytesWritten, int retries) {} + @Override public long startSerialization() {return 0;} + @Override public void endSerialization(long start, int bytes) {} + @Override public long startDeserialization() {return 0;} + @Override public void endDeserialization(long start, int bytes) {} + @Override public long startMsgSerialization() {return 0;} + @Override public void endMsgSerialization(long start) {} + @Override public long startMsgDeserialization() {return 0;} + @Override public void endMsgDeserialization(long start) {} + @Override public void incBatchSendTime(long start) {} + @Override public void incBatchCopyTime(long start) {} + @Override public void incBatchWaitTime(long start) {} + @Override public void incBatchFlushTime(long start) {} - public long startUcastWrite() { return 0; } - public void endUcastWrite(long start, int bytesWritten) {} - public void incUcastWrites(int bytesWritten) {} - public long startMcastWrite() { return 0; } - public void endMcastWrite(long start, int bytesWritten) {} - public void incMcastWrites(int bytesWritten) {} + @Override + public void incUcastWriteBytes(int bytesWritten) {} + @Override + public void incMcastWriteBytes(int bytesWritten) {} + @Override public void incUcastRetransmits() {} + @Override public void incMcastRetransmits() {} + @Override public void incMcastRetransmitRequests() {} + @Override public int getMcastRetransmits() { return 0; } + @Override public int getMcastWrites() { return 0; } - public long startUcastFlush() { return 0; } - public void endUcastFlush(long start) {} - public void incFlowControlRequests() {} - public void incFlowControlResponses() {} - public long startFlowControlWait() { return 0; } - public void endFlowControlWait(long start) {} - public long startFlowControlThrottleWait() { return 0; } - public void endFlowControlThrottleWait(long start) {} - public void incUcastReadBytes(long amount) {} - public void incMcastReadBytes(long amount) {} - public void incJgUNICASTdataReceived(long amount) {} - - public void setJgQueuedMessagesSize(long value) {} - public void setJgSTABLEreceivedMessagesSize(long value) {} - public void setJgSTABLEsentMessagesSize(long value) {} - public void incJgSTABLEsuspendTime(long value) {} - public void incJgSTABLEmessages(long value) {} - public void incJgSTABLEmessagesSent(long value) {} - public void incJgSTABILITYmessages(long value) {} - - public void incjgDownTime(long value){} - public void incjgUpTime(long value){} - public void incjChannelUpTime(long value){} - - public void incThreadOwnedReceivers(long value, int dominoCount) {} - - public void incJgFCsendBlocks(long value) - {} - public void incJgFCautoRequests(long value) - {} - public void incJgFCreplenish(long value) - {} - public void incJgFCresumes(long value) - {} - public void incJgFCsentCredits(long value) - {} - public void incJgFCsentThrottleRequests(long value) - {} - public void incJgFragmentationsPerformed() - {} - public void incJgFragmentsCreated(long numFrags) - {} - public void setJgUNICASTreceivedMessagesSize(long amount) { - } - public void setJgUNICASTsentMessagesSize(long amount) { - } - public void setJgUNICASTsentHighPriorityMessagesSize(long amount) { - } + @Override + public void incUcastReadBytes(int amount) {} + @Override + public void incMcastReadBytes(int amount) {} + @Override public int getAsyncSocketWritesInProgress() {return 0;} + @Override public int getAsyncSocketWrites() {return 0;} + @Override public int getAsyncSocketWriteRetries() {return 0;} + @Override public long getAsyncSocketWriteBytes() {return 0;} + @Override public long getAsyncSocketWriteTime() {return 0;} + @Override public int getAsyncQueues() {return 0;} + @Override public void incAsyncQueues(int inc) {} + @Override public int getAsyncQueueFlushesInProgress() {return 0;} + @Override public int getAsyncQueueFlushesCompleted() {return 0;} + @Override public long getAsyncQueueFlushTime() {return 0;} + @Override public long startAsyncQueueFlush() {return 0;} + @Override public void endAsyncQueueFlush(long start) {} + @Override public int getAsyncQueueTimeouts() {return 0;} + @Override public void incAsyncQueueTimeouts(int inc) {} + @Override public int getAsyncQueueSizeExceeded() {return 0;} + @Override public void incAsyncQueueSizeExceeded(int inc) {} + @Override public int getAsyncDistributionTimeoutExceeded() {return 0;} + @Override public void incAsyncDistributionTimeoutExceeded() {} + @Override public long getAsyncQueueSize() {return 0;} + @Override public void incAsyncQueueSize(long inc) {} + @Override public long getAsyncQueuedMsgs() {return 0;} + @Override public void incAsyncQueuedMsgs() {} + @Override public long getAsyncDequeuedMsgs() {return 0;} + @Override public void incAsyncDequeuedMsgs() {} + @Override public long getAsyncConflatedMsgs() {return 0;} + @Override public void incAsyncConflatedMsgs() {} + @Override public int getAsyncThreads() {return 0;} + @Override public void incAsyncThreads(int inc) {} + @Override public int getAsyncThreadInProgress() {return 0;} + @Override public int getAsyncThreadCompleted() {return 0;} + @Override public long getAsyncThreadTime() {return 0;} + @Override public long startAsyncThread() {return 0;} + @Override public void endAsyncThread(long start) {} + @Override public long getAsyncQueueAddTime() {return 0;} + @Override public void incAsyncQueueAddTime(long inc) {} + @Override public long getAsyncQueueRemoveTime() {return 0;} + @Override public void incAsyncQueueRemoveTime(long inc) {} - public void incJgNAKACKwaits(long value) {} - public void incThreadOwnedReceivers(long value) {} + @Override public void incReceiverBufferSize(int inc, boolean direct) {} + @Override public void incSenderBufferSize(int inc, boolean direct) {} + @Override public long startSocketLock() {return 0;} + @Override public void endSocketLock(long start) {} + @Override public long startBufferAcquire() {return 0;} + @Override public void endBufferAcquire(long start) {} + @Override public void incMessagesBeingReceived(boolean newMsg, int bytes) {} + @Override public void decMessagesBeingReceived(int bytes) {} + @Override public void incReplyHandOffTime(long start) {} + @Override public int getElders() {return 0;} + @Override public void incElders(int val) {} + @Override public int getInitialImageMessagesInFlight() {return 0;} + @Override public void incInitialImageMessagesInFlight(int val) {} + @Override public int getInitialImageRequestsInProgress() {return 0;} + @Override public void incInitialImageRequestsInProgress(int val) {} + @Override public void incPdxSerialization(int bytesWritten) {} + @Override public void incPdxDeserialization(int i) {} + @Override public long startPdxInstanceDeserialization() {return 0;} + @Override public void endPdxInstanceDeserialization(long start) {} + @Override public void incPdxInstanceCreations() {} + @Override + public void incThreadOwnedReceivers(long value, int dominoCount) { + } } protected static class DummyExecutor implements ExecutorService { @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DeadlockDetector.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DeadlockDetector.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DeadlockDetector.java index e361184..f7e3efb 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DeadlockDetector.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DeadlockDetector.java @@ -350,20 +350,20 @@ public class DeadlockDetector { break; case "findDeepestGraph": graph = loadGraphs(1, args); - DependencyGraph result = graph.findDeepestGraph(); - if (result == null) { + graph = graph.findDeepestGraph(); + if (graph == null) { System.out.println("no deepest graph could be found!"); } else { - System.out.println("deepest graph: \n" + prettyFormat(result)); + System.out.println("deepest graph: \n" + prettyFormat(graph)); } break; case "findThread": graph = loadGraphs(2, args); - result = graph.findDependenciesWith(args[1]); - if (result == null) { - System.out.println("thread not found!"); + graph = graph.findDependenciesWith(args[1]); + if (graph == null) { + System.out.println("thread not found! Try using the print command to see all threads and locate the name of the one you're interested in?"); } else { - System.out.println(prettyFormat(sortDependencies(result.getEdges()))); + System.out.println(prettyFormat(sortDependencies(graph.getEdges()))); } break; default: http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DependencyGraph.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DependencyGraph.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DependencyGraph.java index e3d1e55..5d68500 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DependencyGraph.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/deadlock/DependencyGraph.java @@ -13,10 +13,11 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Set; +import com.gemstone.gemfire.internal.util.PluckStacks; + /** * This class holds a graph of dependencies between objects * @@ -140,6 +141,9 @@ public class DependencyGraph implements Serializable { DependencyGraph result = new DependencyGraph(); + // expand the dependency set to include all incoming + // references. These will give us graphs of endpoints + // that reach to the node we're interested in Set<Object> dependsOnObj = new HashSet<>(); dependsOnObj.add(obj); boolean anyAdded = true; @@ -153,6 +157,9 @@ public class DependencyGraph implements Serializable { } } } + // find all of the downward graphs for each depender + // and add their vertices. These are things dependedOn + // by the node we're interested in. for (Object depender: dependsOnObj) { if (!result.getVertices().contains(depender)) { DependencyGraph subgraph = getSubGraph(depender); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java index 5b957c3..fa2ec57 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java @@ -496,6 +496,7 @@ public final class InternalDistributedMember public void setVmViewId(int p) { this.vmViewId = p; synchPayload(); + cachedToString = null; } /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java index 619481e..20bdf1d 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java @@ -429,8 +429,6 @@ public class GMSMember implements NetMember, DataSerializableFixedID { DataSerializer.writeStringArray(groups, out); out.writeLong(uuidMSBs); out.writeLong(uuidLSBs); -// InternalDataSerializer.writeSignedVL(uuidLSBs, out); -// InternalDataSerializer.writeSignedVL(uuidMSBs, out); } @Override @@ -450,18 +448,14 @@ public class GMSMember implements NetMember, DataSerializableFixedID { this.vmKind = in.readInt(); this.name = DataSerializer.readString(in); this.groups = DataSerializer.readStringArray(in); - this.uuidLSBs = in.readLong(); this.uuidMSBs = in.readLong(); -// this.uuidLSBs = InternalDataSerializer.readUnsignedVL(in); -// this.uuidMSBs = InternalDataSerializer.readUnsignedVL(in); + this.uuidLSBs = in.readLong(); } @Override public void writeAdditionalData(DataOutput out) throws IOException { out.writeLong(uuidMSBs); out.writeLong(uuidLSBs); -// InternalDataSerializer.writeSignedVL(uuidLSBs, out); -// InternalDataSerializer.writeSignedVL(uuidMSBs, out); } @Override @@ -469,7 +463,5 @@ public class GMSMember implements NetMember, DataSerializableFixedID { IOException { this.uuidMSBs = in.readLong(); this.uuidLSBs = in.readLong(); -// this.uuidLSBs = InternalDataSerializer.readUnsignedVL(in); -// this.uuidMSBs = InternalDataSerializer.readUnsignedVL(in); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index 274ecd5..db5779b 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -91,4 +91,8 @@ public class GMSHealthMonitor implements HealthMonitor { } + @Override + public void memberShutdown(DistributedMember mbr, String reason) { + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java index aab95b6..9ace2be 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java @@ -28,5 +28,10 @@ public interface HealthMonitor extends Service { * @return */ public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval); - + + /** + * Invoked by the Manager, this notifies the HealthMonitor that a + * ShutdownMessage has been received from the given member + */ + public void memberShutdown(DistributedMember mbr, String reason); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java index d6087bd..7bf35ec 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java @@ -1,5 +1,6 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces; +import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.distributed.internal.membership.NetView; @@ -22,6 +23,12 @@ public interface JoinLeave extends Service { void remove(InternalDistributedMember m, String reason); /** + * Invoked by the Manager, this notifies the HealthMonitor that a + * ShutdownMessage has been received from the given member + */ + public void memberShutdown(DistributedMember mbr, String reason); + + /** * returns the local address */ InternalDistributedMember getMemberID(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java index b379d6d..f2f7dc1 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java @@ -9,6 +9,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.distributed.internal.membership.NetMember; import com.gemstone.gemfire.distributed.internal.membership.NetView; +import com.gemstone.gemfire.distributed.internal.membership.gms.SuspectMember; /** * Manager presents the GMS services to the outside world and @@ -30,25 +31,83 @@ public interface Manager extends Service, MessageHandler { */ Set<InternalDistributedMember> send(DistributionMessage m) throws NotSerializableException; + /** + * initiates a Forced Disconnect, shutting down the distributed system + * and closing the cache + * @param reason + */ void forceDisconnect(String reason); + /** + * notifies the manager that membership quorum has been lost + */ void quorumLost(Collection<InternalDistributedMember> failures, NetView view); + /** + * Notifies the manager that a member has contacted us who is not in the + * current membership view + * @param mbr + * @param birthTime + */ void addSurpriseMemberForTesting(DistributedMember mbr, long birthTime); + /** + * Tests to see if the given member has been put into "shunned" state, + * meaning that it has left the distributed system and we should no longer + * process requests from it. Shunned status eventually times out. + * @param mbr + * @return true if the member is shunned + */ boolean isShunned(DistributedMember mbr); + /** + * returns the lead member from the current membership view. This is + * typically the oldest member that is not an Admin or Locator member. + * @return the ID of the lead member + */ DistributedMember getLeadMember(); + /** + * returns the coordinator of the current membership view. This is + * who created and distributed the view. See NetView. + * @return + */ DistributedMember getCoordinator(); + /** + * sometimes we cannot perform multicast messaging, such as during a + * rolling upgrade. + * @return true if multicast messaging can be performed + */ boolean isMulticastAllowed(); + /** + * This establishes the reason that a shutdown is being performed. + * After this the cancelCriterion will start reporting that a + * cancel is in progress. + * @param e the reason for the shutdown + */ void setShutdownCause(Exception e); + /** + * Returns the reason for a shutdown. + */ Throwable getShutdownCause(); + /** + * Returns true if a shutdown is in progress or has been completed + */ boolean shutdownInProgress(); - + + /** + * similar to forceDisconnect but is used solely by Messenger + * to tell Manager that communications have been lost + */ void membershipFailure(String message, Exception cause); + + /** + * used by HealthMonitor to tell Manager that a member is under + * suspicion + */ + void memberSuspected(SuspectMember suspect); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 76d9d71..9b87856 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -31,7 +31,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.logging.log4j.Logger; +import com.gemstone.gemfire.GemFireConfigException; import com.gemstone.gemfire.SystemConnectException; +import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.DistributionManager; import com.gemstone.gemfire.distributed.internal.DistributionMessage; @@ -209,6 +211,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { response = joinResponse[0]; } if (response != null) { +// DEBUGGING - REMOVE +logger.info("received join response {}", response); joinResponse[0] = null; String failReason = response.getRejectionMessage(); if (failReason != null) { @@ -223,10 +227,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { this.localAddress.setVmViewId(this.birthViewId); GMSMember me = (GMSMember)this.localAddress.getNetMember(); GMSMember o = (GMSMember)response.getMemberID().getNetMember(); + me.setBirthViewId(birthViewId); me.setSplitBrainEnabled(o.isSplitBrainEnabled()); me.setPreferredForCoordinator(o.preferredForCoordinator()); installView(response.getCurrentView()); return true; + } else { + logger.info("received join response with no membership view: {}", response); } } return false; @@ -793,6 +800,15 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { services.getMessenger().send(msg); } } + + @Override + public void memberShutdown(DistributedMember mbr, String reason) { + if (this.isCoordinator) { + LeaveRequestMessage msg = new LeaveRequestMessage(this.localAddress, (InternalDistributedMember)mbr); + recordViewRequest(msg); + } + } + @Override public void disableDisconnectOnQuorumLossForTesting() { @@ -802,6 +818,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { @Override public void init(Services s) { this.services = s; + + DistributionConfig dc = services.getConfig().getDistributionConfig(); + if (dc.getMcastPort() != 0 + && dc.getLocators().trim().isEmpty() + && dc.getStartLocator().trim().isEmpty()) { + throw new GemFireConfigException("Multicast cannot be configured for a non-distributed cache." + + " Please configure the locator services for this cache using "+DistributionConfig.LOCATORS_NAME + + " or " + DistributionConfig.START_LOCATOR_NAME+"."); + } + services.getMessenger().addHandler(JoinRequestMessage.class, this); services.getMessenger().addHandler(JoinResponseMessage.class, this); services.getMessenger().addHandler(InstallViewMessage.class, this); @@ -811,7 +837,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { services.getMessenger().addHandler(JoinRequestMessage.class, this); services.getMessenger().addHandler(JoinResponseMessage.class, this); - DistributionConfig dc = services.getConfig().getDistributionConfig(); int ackCollectionTimeout = dc.getMemberTimeout() * 2 * 12437 / 10000; if (ackCollectionTimeout < 1500) { ackCollectionTimeout = 1500; @@ -1021,18 +1046,37 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { List<InternalDistributedMember> removalReqs = new ArrayList<InternalDistributedMember>(); List<String> removalReasons = new ArrayList<String>(); + NetView oldView = currentView; + List<InternalDistributedMember> oldMembers; + if (oldView != null) { + oldMembers = oldView.getMembers(); + } else { + oldMembers = Collections.emptyList(); + } + for (DistributionMessage msg: requests) { logger.debug("processing request {}", msg); + + InternalDistributedMember mbr = null; + if (msg instanceof JoinRequestMessage) { - InternalDistributedMember mbr = ((JoinRequestMessage)msg).getMemberID(); - joinReqs.add(mbr); + mbr = ((JoinRequestMessage)msg).getMemberID(); + if (!oldMembers.contains(mbr)) { + joinReqs.add(mbr); + } } else if (msg instanceof LeaveRequestMessage) { - leaveReqs.add(((LeaveRequestMessage) msg).getMemberID()); + mbr = ((LeaveRequestMessage) msg).getMemberID(); + if (oldMembers.contains(mbr)) { + leaveReqs.add(mbr); + } } else if (msg instanceof RemoveMemberMessage) { - removalReqs.add(((RemoveMemberMessage) msg).getMemberID()); - removalReasons.add(((RemoveMemberMessage) msg).getReason()); + mbr = ((RemoveMemberMessage) msg).getMemberID(); + if (oldMembers.contains(mbr)) { + removalReqs.add(mbr); + removalReasons.add(((RemoveMemberMessage) msg).getReason()); + } } else { // TODO: handle removals http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index ab0aa31..656230a 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -251,6 +251,12 @@ public class JGroupsMessenger implements Messenger { } catch (Exception e) { throw new GemFireConfigException("unable to create jgroups channel", e); } + + // give the stats to the jchannel statistics recorder + StatRecorder sr = (StatRecorder)myChannel.getProtocolStack().findProtocol(StatRecorder.class); + if (sr != null) { + sr.setDMStats(services.getStatistics()); + } try { @@ -539,8 +545,9 @@ public class JGroupsMessenger implements Messenger { // The contract is that every destination enumerated in the // message should have received the message. If one left // (i.e., left the view), we signal it here. - if (msg.forAll()) - return null; + if (msg.forAll()) { + return Collections.emptySet(); + } Set<InternalDistributedMember> result = new HashSet<InternalDistributedMember>(); NetView newView = services.getJoinLeave().getView(); if (newView != null) { @@ -551,8 +558,6 @@ public class JGroupsMessenger implements Messenger { } } } - if (result.size() == 0) - return null; return result; } @@ -586,12 +591,14 @@ public class JGroupsMessenger implements Messenger { msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); } try { + long start = services.getStatistics().startMsgSerialization(); HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version)); - Version.CURRENT.writeOrdinal(out_stream, true); - DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream); - DataSerializer.writeObject(gfmsg, out_stream); - msg.setBuffer(out_stream.toByteArray()); + Version.CURRENT.writeOrdinal(out_stream, true); + DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream); + DataSerializer.writeObject(gfmsg, out_stream); + msg.setBuffer(out_stream.toByteArray()); + services.getStatistics().endMsgSerialization(start); } catch(IOException ex) { IllegalArgumentException ia = new @@ -628,6 +635,8 @@ public class JGroupsMessenger implements Messenger { Exception problem = null; try { + long start = services.getStatistics().startMsgDeserialization(); + byte[] buf = jgmsg.getRawBuffer(); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, jgmsg.getOffset(), jgmsg.getLength())); @@ -646,6 +655,8 @@ public class JGroupsMessenger implements Messenger { if (result instanceof DistributionMessage) { ((DistributionMessage)result).setSender(sender); } + + services.getStatistics().endMsgDeserialization(start); logger.debug("JGroupsReceiver deserialized {}", result); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java new file mode 100755 index 0000000..49dc423 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java @@ -0,0 +1,107 @@ +package com.gemstone.gemfire.distributed.internal.membership.gms.messenger; + +import org.apache.logging.log4j.Logger; +import org.jgroups.Event; +import org.jgroups.Message; +import org.jgroups.conf.ClassConfigurator; +import org.jgroups.protocols.UNICAST3; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.protocols.pbcast.NakAckHeader2; +import org.jgroups.stack.Protocol; + +import com.gemstone.gemfire.distributed.internal.DMStats; +import com.gemstone.gemfire.distributed.internal.membership.gms.Services; + +/** + * JGroups doesn't capture quite the stats we want so this protocol is + * inserted into the stack to gather the missing ones. + * + * @author bschuchardt + * + */ +public class StatRecorder extends Protocol { + + private static final Logger logger = Services.getLogger(); + + private static final int OUTGOING = 0; + private static final int INCOMING = 1; + + DMStats stats; + + private final short nakackHeaderId = ClassConfigurator.getProtocolId(NAKACK2.class); + private final short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class); + + /** + * set the statistics object to modify when events are detected + * @param stats + */ + public void setDMStats(DMStats stats) { + this.stats = stats; + } + + @Override + public Object up(Event evt) { + switch (evt.getType()) { + case Event.MSG: + Message msg = (Message)evt.getArg(); + processForMulticast(msg, INCOMING); + processForUnicast(msg, INCOMING); + } + return up_prot.up(evt); + } + + @Override + public Object down(Event evt) { + switch (evt.getType()) { + case Event.MSG: + Message msg = (Message)evt.getArg(); + processForMulticast(msg, INCOMING); + processForUnicast(msg, INCOMING); + } + return down_prot.down(evt); + } + + + private void processForMulticast(Message msg, int direction) { + Object o = msg.getHeader(nakackHeaderId); + if (o instanceof NakAckHeader2 && stats != null) { + NakAckHeader2 hdr = (NakAckHeader2)o; + switch (direction) { + case INCOMING: + stats.incMcastReadBytes((int)msg.size()); + break; + case OUTGOING: + stats.incMcastWriteBytes((int)msg.size()); + switch (hdr.getType()) { + case NakAckHeader2.XMIT_RSP: + stats.incMcastRetransmits(); + break; + case NakAckHeader2.XMIT_REQ: + stats.incMcastRetransmitRequests(); + break; + } + break; + } + } + } + + private void processForUnicast(Message msg, int direction) { + Object o = msg.getHeader(unicastHeaderId); + if (o instanceof UNICAST3.Header && stats != null) { + UNICAST3.Header hdr = (UNICAST3.Header)o; + switch (direction) { + case INCOMING: + stats.incUcastReadBytes((int)msg.size()); + break; + case OUTGOING: + stats.incUcastWriteBytes((int)msg.size()); + switch (hdr.type()) { + case UNICAST3.Header.XMIT_REQ: + stats.incUcastRetransmits(); + break; + } + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index b9425c4..b89b984 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -1361,6 +1361,11 @@ public class GMSMembershipManager implements MembershipManager, Manager } } + @Override + public void memberSuspected(SuspectMember suspect) { + handleOrDeferSuspect(suspect); + } + /** * Process a new view object, or place on the startup queue * @param suspectInfo the suspectee and suspector @@ -1717,6 +1722,8 @@ public class GMSMembershipManager implements MembershipManager, Manager } synchronized(this.shutdownMembers) { this.shutdownMembers.put(id, id); + services.getHealthMonitor().memberShutdown(id, reason); + services.getJoinLeave().memberShutdown(id, reason); } } @@ -2940,4 +2947,5 @@ public class GMSMembershipManager implements MembershipManager, Manager Services.setSecurityLogWriter(writer); } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java index ca3a9b7..a859ba6 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java @@ -212,7 +212,7 @@ public class TcpServer { } public void join() throws InterruptedException { - this.log.info("TcpServer@"+System.identityHashCode(this)+" join() invoked. Server thread="+serverThread+"@"+System.identityHashCode(serverThread)+";alive="+serverThread.isAlive()); +// this.log.info("TcpServer@"+System.identityHashCode(this)+" join() invoked. Server thread="+serverThread+"@"+System.identityHashCode(serverThread)+";alive="+serverThread.isAlive()); if(serverThread != null) { serverThread.join(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/MiscellaneousCommands.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/MiscellaneousCommands.java b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/MiscellaneousCommands.java index da8f11d..20e64e7 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/MiscellaneousCommands.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/MiscellaneousCommands.java @@ -24,6 +24,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -574,13 +575,24 @@ public class MiscellaneousCommands implements CommandMarker { Set<DistributedMember> allMembers = CliUtil.getAllMembers(cache); GemFireDeadlockDetector gfeDeadLockDetector = new GemFireDeadlockDetector(allMembers); DependencyGraph dependencyGraph = gfeDeadLockDetector.find(); - LinkedList<Dependency> deadlock = dependencyGraph.findCycle(); + Collection<Dependency> deadlock = dependencyGraph.findCycle(); + DependencyGraph deepest = null; + if (deadlock == null) { + deepest = dependencyGraph.findDeepestGraph(); + if (deepest != null) { + deadlock = deepest.getEdges(); + } + } Set<Dependency> dependencies = (Set<Dependency>) dependencyGraph.getEdges(); InfoResultData resultData = ResultBuilder.createInfoResultData(); if (deadlock != null) { - resultData.addLine(CliStrings.SHOW_DEADLOCK__DEADLOCK__DETECTED); + if (deepest != null) { + resultData.addLine(CliStrings.SHOW_DEADLOCK__DEEPEST_FOUND); + } else { + resultData.addLine(CliStrings.SHOW_DEADLOCK__DEADLOCK__DETECTED); + } resultData.addLine(DeadlockDetector.prettyFormat(deadlock)); } else { resultData.addLine(CliStrings.SHOW_DEADLOCK__NO__DEADLOCK); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java index 6c981e5..9ed9164 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java @@ -1491,6 +1491,7 @@ public class CliStrings { public static final String SHOW_DEADLOCK__DEPENDENCIES__FILE__HELP = "Name of the file to which dependencies between members will be written."; public static final String SHOW_DEADLOCK__NO__DEADLOCK = "No dead lock detected."; public static final String SHOW_DEADLOCK__DEADLOCK__DETECTED = "Dead lock detected."; + public static final String SHOW_DEADLOCK__DEEPEST_FOUND = "No deadlock was detected. Here is the deepest call chain that could be found"; public static final String SHOW_DEADLOCK__DEPENDENCIES__REVIEW = "Please view the dependencies between the members in file : {0}"; public static final String SHOW_DEADLOCK__ERROR = "Error"; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml index 5bae9e6..73e66b4 100755 --- a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml +++ b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml @@ -39,6 +39,7 @@ oob_thread_pool.rejection_policy="discard" /> <com.gemstone.gemfire.distributed.internal.membership.gms.messenger.AddressManager/> +<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.StatRecorder/> <UNICAST3 xmit_interval="500" xmit_table_num_rows="100" http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml index fe657e6..4c6f25c 100755 --- a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml +++ b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml @@ -48,6 +48,7 @@ oob_thread_pool.rejection_policy="discard"/> <com.gemstone.gemfire.distributed.internal.membership.gms.messenger.AddressManager/> +<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.StatRecorder/> <pbcast.NAKACK2 xmit_interval="MCAST_RETRANSMIT_INTERVAL" http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/26fea7a1/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java new file mode 100755 index 0000000..88fcbd2 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java @@ -0,0 +1,181 @@ +package com.gemstone.gemfire.distributed.internal.membership.gms.membership; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Properties; + +import org.jgroups.Event; +import org.jgroups.Message; +import org.jgroups.protocols.UNICAST3.Header; +import org.jgroups.protocols.pbcast.NakAckHeader2; +import org.jgroups.stack.Protocol; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.LonerDistributionManager.DummyDMStats; +import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig; +import com.gemstone.gemfire.distributed.internal.membership.gms.Services; +import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger; +import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.StatRecorder; +import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +/** + * This class tests the GMS StatRecorder class, which records JGroups + * messaging statistics + */ +@Category(UnitTest.class) +public class StatRecorderJUnitTest { + Protocol mockDownProtocol, mockUpProtocol; + StatRecorder recorder; + MyStats stats = new MyStats(); + + @Before + public void initMocks() throws Exception { + // create a StatRecorder that has mock up/down protocols and stats + mockDownProtocol = mock(Protocol.class); + mockUpProtocol = mock(Protocol.class); + recorder = new StatRecorder(); + recorder.setDMStats(stats); + recorder.setUpProtocol(mockUpProtocol); + recorder.setDownProtocol(mockDownProtocol); + } + + /** + * Ensure that unicast events are recorded in DMStats + */ + @Test + public void testUnicastStats() throws Exception { + Message msg = mock(Message.class); + when(msg.getHeader(any(Short.class))).thenReturn(Header.createDataHeader(1L, (short)1, true)); + when(msg.size()).thenReturn(150L); + + Event evt = new Event(Event.MSG, msg); + recorder.up(evt); + assert stats.ucastMessagesReceived == 1; + + recorder.down(evt); + assert stats.ucastMessagesSent == 1; + + when(msg.getHeader(any(Short.class))).thenReturn(Header.createXmitReqHeader()); + recorder.up(evt); + assert stats.ucastRetransmits == 1; + } + + /** + * ensure that multicast events are recorded in DMStats + */ + @Test + public void testMulticastStats() throws Exception { + Message msg = mock(Message.class); + when(msg.getHeader(any(Short.class))).thenReturn(NakAckHeader2.createMessageHeader(1L)); + when(msg.size()).thenReturn(150L); + + Event evt = new Event(Event.MSG, msg); + recorder.up(evt); + assert stats.mcastMessagesReceived == 1; + + recorder.down(evt); + assert stats.mcastMessagesSent == 1; + + when(msg.getHeader(any(Short.class))).thenReturn(NakAckHeader2.createXmitRequestHeader(null)); + recorder.up(evt); + assert stats.mcastRetransmitRequests == 1; + + when(msg.getHeader(any(Short.class))).thenReturn(NakAckHeader2.createXmitResponseHeader()); + recorder.up(evt); + assert stats.mcastRetransmits == 1; + } + + + /** + * Ensure that the messenger JGroups configuration XML strings contain + * the statistics recorder protocol + */ + @Test + public void messengerStackHoldsStatRecorder() throws Exception { + Services mockServices = mock(Services.class); + ServiceConfig mockConfig = mock(ServiceConfig.class); + when(mockServices.getConfig()).thenReturn(mockConfig); + + // first test to see if the non-multicast stack has the recorder installed + Properties nonDefault = new Properties(); + nonDefault.put(DistributionConfig.MCAST_PORT_NAME, "0"); + nonDefault.put(DistributionConfig.LOCATORS_NAME, "localhost[12345]"); + DistributionConfigImpl config = new DistributionConfigImpl(nonDefault); + when(mockConfig.getDistributionConfig()).thenReturn(config); + + RemoteTransportConfig transport = new RemoteTransportConfig(config, + DistributionManager.NORMAL_DM_TYPE); + when(mockConfig.getTransport()).thenReturn(transport); + + JGroupsMessenger messenger = new JGroupsMessenger(); + messenger.init(mockServices); + String jgroupsConfig = messenger.getJGroupsStackConfig(); + System.out.println(jgroupsConfig); + assert jgroupsConfig.contains("gms.messenger.StatRecorder"); + + // now test to see if the multicast stack has the recorder installed + nonDefault.put(DistributionConfig.MCAST_PORT_NAME, "12345"); + config = new DistributionConfigImpl(nonDefault); + transport = new RemoteTransportConfig(config, DistributionManager.NORMAL_DM_TYPE); + when(mockConfig.getDistributionConfig()).thenReturn(config); + when(mockConfig.getTransport()).thenReturn(transport); + messenger = new JGroupsMessenger(); + messenger.init(mockServices); + assert jgroupsConfig.contains("gms.messenger.StatRecorder"); + } + + static class MyStats extends DummyDMStats { + public int ucastMessagesReceived; + public int ucastMessagesSent; + public int ucastRetransmits; + + public int mcastMessagesReceived; + public int mcastMessagesSent; + public int mcastRetransmits; + public int mcastRetransmitRequests; + + @Override + public void incUcastReadBytes(int i) { + ucastMessagesReceived += i; + } + + @Override + public void incUcastWriteBytes(int i) { + ucastMessagesSent += i; + } + + @Override + public void incUcastRetransmits() { + ucastRetransmits++; + } + + @Override + public void incMcastReadBytes(int i) { + mcastMessagesReceived += i; + } + + @Override + public void incMcastWriteBytes(int i) { + mcastMessagesSent += i; + } + + @Override + public void incMcastRetransmits() { + mcastRetransmits++; + } + + @Override + public void incMcastRetransmitRequests() { + mcastRetransmitRequests++; + } + } + +}