Author: frm
Date: Thu Jan 26 12:18:37 2017
New Revision: 1780371
URL: http://svn.apache.org/viewvc?rev=1780371&view=rev
Log:
OAK-5350 - Add unit tests for CommunicationObserver
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
(with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java?rev=1780371&r1=1780370&r2=1780371&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java
Thu Jan 26 12:18:37 2017
@@ -54,15 +54,15 @@ class RequestObserverHandler extends Cha
}
private void onGetHeadRequest(GetHeadRequest request, InetSocketAddress
address) throws Exception {
- observer.gotMessageFrom(request.getClientId(), "get head", address);
+ observer.gotMessageFrom(request.getClientId(), "get head",
address.getAddress().getHostAddress(), address.getPort());
}
private void onGetSegmentRequest(GetSegmentRequest request,
InetSocketAddress address) throws Exception {
- observer.gotMessageFrom(request.getClientId(), "get segment", address);
+ observer.gotMessageFrom(request.getClientId(), "get segment",
address.getAddress().getHostAddress(), address.getPort());
}
private void onGetBlobRequest(GetBlobRequest request, InetSocketAddress
address) throws Exception {
- observer.gotMessageFrom(request.getClientId(), "get blob id", address);
+ observer.gotMessageFrom(request.getClientId(), "get blob id",
address.getAddress().getHostAddress(), address.getPort());
}
}
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java?rev=1780371&r1=1780370&r2=1780371&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
Thu Jan 26 12:18:37 2017
@@ -16,190 +16,119 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.segment.standby.store;
+package org.apache.jackrabbit.oak.segment.standby.store;
import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import javax.annotation.Nonnull;
-import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean;
-import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommunicationObserver {
- private static final int MAX_CLIENT_STATISTICS = 10;
-
- private static class CommunicationPartnerMBean implements
ObservablePartnerMBean {
- private final ObjectName mbeanName;
- private final String clientName;
- public String lastRequest;
- public Date lastSeen;
- public String remoteAddress;
- public int remotePort;
- public long segmentsSent;
- public long segmentBytesSent;
- public long binariesSent;
- public long binariesBytesSent;
-
- public CommunicationPartnerMBean(String clientName) throws
MalformedObjectNameException {
- this.clientName = clientName;
- this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME +
",id=\"Client " + clientName + "\"");
- }
-
- public ObjectName getMBeanName() {
- return this.mbeanName;
- }
-
- @Nonnull
- @Override
- public String getName() {
- return this.clientName;
- }
-
- @Override
- public String getRemoteAddress() {
- return this.remoteAddress;
- }
-
- @Override
- public String getLastRequest() {
- return this.lastRequest;
- }
-
- @Override
- public int getRemotePort() {
- return this.remotePort;
- }
- @Override
- public String getLastSeenTimestamp() {
- return this.lastSeen == null ? null : this.lastSeen.toString();
- }
+ static final int MAX_CLIENT_STATISTICS = 10;
- @Override
- public long getTransferredSegments() {
- return this.segmentsSent;
- }
+ private static final Logger log =
LoggerFactory.getLogger(CommunicationObserver.class);
- @Override
- public long getTransferredSegmentBytes() {
- return this.segmentBytesSent;
- }
+ private final Map<String, CommunicationPartnerMBean> partnerDetails = new
HashMap<>();
- @Override
- public long getTransferredBinaries() {
- return this.binariesSent;
- }
+ private final String id;
- @Override
- public long getTransferredBinariesBytes() {
- return this.binariesBytesSent;
- }
+ public CommunicationObserver(String id) {
+ this.id = id;
}
- private static final Logger log = LoggerFactory
- .getLogger(CommunicationObserver.class);
-
- private final String identifier;
- private final Map<String, CommunicationPartnerMBean> partnerDetails;
+ void unregisterCommunicationPartner(CommunicationPartnerMBean m) throws
Exception {
+
ManagementFactory.getPlatformMBeanServer().unregisterMBean(m.getMBeanName());
+ }
- public CommunicationObserver(String myID) {
- this.identifier = myID;
- this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
+ void registerCommunicationPartner(CommunicationPartnerMBean m) throws
Exception {
+ ManagementFactory.getPlatformMBeanServer().registerMBean(new
StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
}
- private static void unregister(CommunicationPartnerMBean m) {
- final MBeanServer jmxServer =
ManagementFactory.getPlatformMBeanServer();
+ private void safeUnregisterCommunicationPartner(CommunicationPartnerMBean
m) {
try {
- jmxServer.unregisterMBean(m.getMBeanName());
+ unregisterCommunicationPartner(m);
+ } catch (Exception e) {
+ log.error(String.format("Unable to unregister MBean for client
%s", m.getName()), e);
}
- catch (Exception e) {
- log.error("error unregistering mbean for client '" + m.getName() +
"'", e);
+ }
+
+ private void safeRegisterCommunicationPartner(CommunicationPartnerMBean m)
{
+ try {
+ registerCommunicationPartner(m);
+ } catch (Exception e) {
+ log.error(String.format("Unable to register MBean for client %s",
m.getName()), e);
}
}
public void unregister() {
- for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
- unregister(m);
+ for (CommunicationPartnerMBean m : partnerDetails.values()) {
+ safeUnregisterCommunicationPartner(m);
}
}
- public void gotMessageFrom(String client, String request,
InetSocketAddress remote) throws MalformedObjectNameException {
- log.debug("got message '" + request + "' from client " + client);
- CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ public void gotMessageFrom(String client, String request, String address,
int port) throws MalformedObjectNameException {
+ log.debug("Message '{}' received from client {}", request, client);
+ CommunicationPartnerMBean m = partnerDetails.get(client);
boolean register = false;
if (m == null) {
cleanUp();
m = new CommunicationPartnerMBean(client);
- m.remoteAddress = remote.getAddress().getHostAddress();
- m.remotePort = remote.getPort();
+ m.setRemoteAddress(address);
+ m.setRemotePort(port);
register = true;
}
- m.lastSeen = new Date();
- m.lastRequest = request;
- this.partnerDetails.put(client, m);
+ m.setLastSeen(new Date());
+ m.setLastRequest(request);
+ partnerDetails.put(client, m);
if (register) {
- final MBeanServer jmxServer =
ManagementFactory.getPlatformMBeanServer();
- try {
- jmxServer.registerMBean(new StandardMBean(m,
ObservablePartnerMBean.class), m.getMBeanName());
- }
- catch (Exception e) {
- log.error("can register mbean for client '" + m.getName() +
"'", e);
- }
+ safeRegisterCommunicationPartner(m);
}
}
public void didSendSegmentBytes(String client, int size) {
- log.debug("did send segment with " + size + " bytes to client " +
client);
- CommunicationPartnerMBean m = this.partnerDetails.get(client);
- m.segmentsSent++;
- m.segmentBytesSent += size;
- this.partnerDetails.put(client, m);
+ log.debug("Segment with size {} sent to client {}", size, client);
+ CommunicationPartnerMBean m = partnerDetails.get(client);
+ m.onSegmentSent(size);
+ partnerDetails.put(client, m);
}
public void didSendBinariesBytes(String client, int size) {
- log.debug("did send binary with " + size + " bytes to client " +
client);
- CommunicationPartnerMBean m = this.partnerDetails.get(client);
- m.binariesSent++;
- m.binariesBytesSent += size;
- this.partnerDetails.put(client, m);
+ log.debug("Binary with size {} sent to client {}", size, client);
+ CommunicationPartnerMBean m = partnerDetails.get(client);
+ m.onBinarySent(size);
+ partnerDetails.put(client, m);
}
public String getID() {
- return this.identifier;
+ return id;
}
- // helper
-
private void cleanUp() {
- while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
+ while (partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
CommunicationPartnerMBean oldestEntry = oldestEntry();
- if (oldestEntry == null) {
- return;
- }
- log.info("housekeeping: removing statistics for " +
oldestEntry.getName());
- unregister(oldestEntry);
- this.partnerDetails.remove(oldestEntry.getName());
+ log.info("Housekeeping: Removing statistics for client " +
oldestEntry.getName());
+ safeUnregisterCommunicationPartner(oldestEntry);
+ partnerDetails.remove(oldestEntry.getName());
}
}
private CommunicationPartnerMBean oldestEntry() {
CommunicationPartnerMBean ret = null;
- for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
- if (ret == null || ret.lastSeen.after(m.lastSeen)) {
+ for (CommunicationPartnerMBean m : partnerDetails.values()) {
+ if (ret == null || ret.getLastSeen().after(m.getLastSeen())) {
ret = m;
}
}
return ret;
}
+
}
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java?rev=1780371&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
Thu Jan 26 12:18:37 2017
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.store;
+
+import java.util.Date;
+
+import javax.annotation.Nonnull;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean;
+import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
+
+class CommunicationPartnerMBean implements ObservablePartnerMBean {
+
+ private final ObjectName mbeanName;
+
+ private final String clientName;
+
+ private String lastRequest;
+
+ private Date lastSeen;
+
+ private String lastSeenTimestamp;
+
+ private String remoteAddress;
+
+ private int remotePort;
+
+ private long segmentsSent;
+
+ private long segmentBytesSent;
+
+ private long binariesSent;
+
+ private long binariesBytesSent;
+
+ CommunicationPartnerMBean(String clientName) throws
MalformedObjectNameException {
+ this.clientName = clientName;
+ this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME +
",id=\"Client " + clientName + "\"");
+ }
+
+ ObjectName getMBeanName() {
+ return this.mbeanName;
+ }
+
+ @Nonnull
+ @Override
+ public String getName() {
+ return this.clientName;
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return this.remoteAddress;
+ }
+
+ @Override
+ public String getLastRequest() {
+ return this.lastRequest;
+ }
+
+ @Override
+ public int getRemotePort() {
+ return this.remotePort;
+ }
+
+ @Override
+ public String getLastSeenTimestamp() {
+ return this.lastSeenTimestamp;
+ }
+
+ @Override
+ public long getTransferredSegments() {
+ return this.segmentsSent;
+ }
+
+ @Override
+ public long getTransferredSegmentBytes() {
+ return this.segmentBytesSent;
+ }
+
+ @Override
+ public long getTransferredBinaries() {
+ return this.binariesSent;
+ }
+
+ @Override
+ public long getTransferredBinariesBytes() {
+ return this.binariesBytesSent;
+ }
+
+ void setRemoteAddress(String remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ }
+
+ void setRemotePort(int remotePort) {
+ this.remotePort = remotePort;
+ }
+
+ Date getLastSeen() {
+ return lastSeen;
+ }
+
+ void setLastSeen(Date lastSeen) {
+ this.lastSeen = lastSeen;
+ this.lastSeenTimestamp = lastSeen.toString();
+ }
+
+ void setLastRequest(String lastRequest) {
+ this.lastRequest = lastRequest;
+ }
+
+ void onSegmentSent(long bytes) {
+ segmentsSent++;
+ segmentBytesSent += bytes;
+ }
+
+ void onBinarySent(long bytes) {
+ binariesSent++;
+ binariesBytesSent += bytes;
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java?rev=1780371&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java
Thu Jan 26 12:18:37 2017
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.store;
+
+import static java.util.UUID.randomUUID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CommunicationObserverTest {
+
+ private static class TestCommunicationObserver extends
CommunicationObserver {
+
+ private final List<CommunicationPartnerMBean> communicationPartners =
new ArrayList<>();
+
+ TestCommunicationObserver(String id) {
+ super(id);
+ }
+
+ @Override
+ void registerCommunicationPartner(CommunicationPartnerMBean m) throws
Exception {
+ super.registerCommunicationPartner(m);
+ communicationPartners.add(m);
+ }
+
+ @Override
+ void unregisterCommunicationPartner(CommunicationPartnerMBean m)
throws Exception {
+ communicationPartners.remove(m);
+ super.unregisterCommunicationPartner(m);
+ }
+
+ }
+
+ private TestCommunicationObserver observer;
+
+ @Before
+ public void before() {
+ observer = new TestCommunicationObserver("test");
+ }
+
+ @After
+ public void after() {
+ observer.unregister();
+ observer = null;
+ }
+
+ @Test
+ public void shouldExposeTheProvidedID() throws Exception {
+ assertEquals("test", observer.getID());
+ }
+
+ @Test
+ public void shouldRegisterObservablePartnerMBean() throws Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ assertEquals(1, observer.communicationPartners.size());
+ }
+
+ @Test
+ public void shouldNotKeepManyObservablePartnerMBeans() throws Exception {
+ for (int i = 0; i < CommunicationObserver.MAX_CLIENT_STATISTICS * 2;
i++) {
+ observer.gotMessageFrom(randomUUID().toString(), "request",
"127.0.0.1", 8080);
+ }
+ assertEquals(CommunicationObserver.MAX_CLIENT_STATISTICS,
observer.communicationPartners.size());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeClientName() throws
Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ assertEquals("client",
observer.communicationPartners.get(0).getName());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeAddress() throws Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ assertEquals("127.0.0.1",
observer.communicationPartners.get(0).getRemoteAddress());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposePort() throws Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ assertEquals(8080,
observer.communicationPartners.get(0).getRemotePort());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeLastRequest() throws
Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ assertEquals("request",
observer.communicationPartners.get(0).getLastRequest());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldUpdateLastRequest() throws
Exception {
+ observer.gotMessageFrom("client", "before", "127.0.0.1", 8080);
+ assertEquals("before",
observer.communicationPartners.get(0).getLastRequest());
+ observer.gotMessageFrom("client", "after", "127.0.0.1", 8080);
+ assertEquals("after",
observer.communicationPartners.get(0).getLastRequest());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeLastSeen() throws Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ assertNotNull(observer.communicationPartners.get(0).getLastSeen());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldUpdateLastSeen() throws Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ Date before = observer.communicationPartners.get(0).getLastSeen();
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ Date after = observer.communicationPartners.get(0).getLastSeen();
+ assertNotSame(before, after);
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeLastSeenTimestamp() throws
Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+
assertNotNull(observer.communicationPartners.get(0).getLastSeenTimestamp());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeSentSegments() throws
Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ observer.didSendSegmentBytes("client", 100);
+ assertEquals(1,
observer.communicationPartners.get(0).getTransferredSegments());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeSentSegmentsSize() throws
Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ observer.didSendSegmentBytes("client", 100);
+ assertEquals(100,
observer.communicationPartners.get(0).getTransferredSegmentBytes());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeSentBinaries() throws
Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ observer.didSendBinariesBytes("client", 100);
+ assertEquals(1,
observer.communicationPartners.get(0).getTransferredBinaries());
+ }
+
+ @Test
+ public void observablePartnerMBeanShouldExposeSentBinariesSize() throws
Exception {
+ observer.gotMessageFrom("client", "request", "127.0.0.1", 8080);
+ observer.didSendBinariesBytes("client", 100);
+ assertEquals(100,
observer.communicationPartners.get(0).getTransferredBinariesBytes());
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java
------------------------------------------------------------------------------
svn:eol-style = native