Author: frm
Date: Thu Dec 7 10:03:41 2017
New Revision: 1817343
URL: http://svn.apache.org/viewvc?rev=1817343&view=rev
Log:
OAK-7027 - Close StandbyClient as soon as the synchronization is over
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java?rev=1817343&r1=1817342&r2=1817343&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
Thu Dec 7 10:03:41 2017
@@ -33,7 +33,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
-import com.google.common.base.Supplier;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.apache.jackrabbit.oak.segment.file.FileStore;
@@ -86,6 +85,18 @@ public final class StandbyClientSync imp
private final File spoolFolder;
+ private final StandbyClientSyncExecution execution;
+
+ private static String clientId() {
+ String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
+
+ if (s == null || s.isEmpty()) {
+ return UUID.randomUUID().toString();
+ }
+
+ return s;
+ }
+
public StandbyClientSync(String host, int port, FileStore store, boolean
secure, int readTimeoutMs, boolean autoClean, File spoolFolder) {
this.state = STATUS_INITIALIZING;
this.lastSuccessfulRequest = -1;
@@ -98,14 +109,12 @@ public final class StandbyClientSync imp
this.readTimeoutMs = readTimeoutMs;
this.autoClean = autoClean;
this.fileStore = store;
- String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
- this.observer = new CommunicationObserver((s == null || s.isEmpty()) ?
UUID.randomUUID().toString() : s);
+ this.observer = new CommunicationObserver(clientId());
this.group = new NioEventLoopGroup(0, new
NamedThreadFactory("standby"));
+ this.execution = new StandbyClientSyncExecution(fileStore,
running::get);
this.spoolFolder = spoolFolder;
-
- final MBeanServer jmxServer =
ManagementFactory.getPlatformMBeanServer();
try {
- jmxServer.registerMBean(new StandardMBean(this,
ClientStandbyStatusMBean.class), new ObjectName(this.getMBeanName()));
+ ManagementFactory.getPlatformMBeanServer().registerMBean(new
StandardMBean(this, ClientStandbyStatusMBean.class), new
ObjectName(this.getMBeanName()));
} catch (Exception e) {
log.error("cannot register standby status mbean", e);
}
@@ -154,18 +163,23 @@ public final class StandbyClientSync imp
try {
long startTimestamp = System.currentTimeMillis();
+
+ GCGeneration genBefore = headGeneration(fileStore);
+
try (StandbyClient client = new StandbyClient(group,
observer.getID(), secure, readTimeoutMs, spoolFolder)) {
client.connect(host, port);
+ execution.execute(client);
+ }
+
+ fileStore.flush();
- GCGeneration genBefore = headGeneration(fileStore);
- new StandbyClientSyncExecution(fileStore, client,
newRunningSupplier()).execute();
- GCGeneration genAfter = headGeneration(fileStore);
-
- if (autoClean && (genAfter.compareWith(genBefore)) > 0) {
- log.info("New head generation detected (prevHeadGen:
{} newHeadGen: {}), running cleanup.", genBefore, genAfter);
- cleanupAndRemove();
- }
+ GCGeneration genAfter = headGeneration(fileStore);
+
+ if (autoClean && genAfter.compareWith(genBefore) > 0) {
+ log.info("New head generation detected (prevHeadGen: {}
newHeadGen: {}), running cleanup.", genBefore, genAfter);
+ cleanupAndRemove();
}
+
this.failedRequests = 0;
this.syncStartTimestamp = startTimestamp;
this.syncEndTimestamp = System.currentTimeMillis();
@@ -192,17 +206,6 @@ public final class StandbyClientSync imp
fileStore.cleanup();
}
- private Supplier<Boolean> newRunningSupplier() {
- return new Supplier<Boolean>() {
-
- @Override
- public Boolean get() {
- return running.get();
- }
-
- };
- }
-
@Nonnull
@Override
public String getMode() {
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java?rev=1817343&r1=1817342&r2=1817343&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
Thu Dec 7 10:03:41 2017
@@ -24,6 +24,7 @@ import java.util.UUID;
import javax.annotation.Nullable;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.SegmentId;
@@ -49,19 +50,16 @@ class StandbyClientSyncExecution {
private final SegmentIdProvider idProvider;
- private final StandbyClient client;
-
private final Supplier<Boolean> running;
- StandbyClientSyncExecution(FileStore store, StandbyClient client,
Supplier<Boolean> running) {
+ StandbyClientSyncExecution(FileStore store, Supplier<Boolean> running) {
this.store = store;
this.idProvider = store.getSegmentIdProvider();
- this.client = client;
this.running = running;
}
- void execute() throws Exception {
- RecordId remoteHead = getHead();
+ void execute(StandbyClient client) throws Exception {
+ RecordId remoteHead = getHead(client);
if (remoteHead == null) {
log.error("Unable to fetch remote head");
@@ -72,18 +70,17 @@ class StandbyClientSyncExecution {
return;
}
- long t = System.currentTimeMillis();
+ Stopwatch stopwatch = Stopwatch.createStarted();
SegmentNodeState before = store.getHead();
SegmentNodeBuilder builder = before.builder();
SegmentNodeState current = newSegmentNodeState(remoteHead);
- compareAgainstBaseState(current, before, builder);
- boolean ok = store.getRevisions().setHead(before.getRecordId(),
remoteHead);
- store.flush();
- log.info("updated head state successfully: {} in {}ms.", ok,
System.currentTimeMillis() - t);
+ compareAgainstBaseState(client, current, before, builder);
+ store.getRevisions().setHead(before.getRecordId(), remoteHead);
+ log.info("Updated head state in {}", stopwatch);
}
@Nullable
- private RecordId getHead() throws Exception {
+ private RecordId getHead(StandbyClient client) throws Exception {
String head = client.getHead();
if (head == null) {
return null;
@@ -95,18 +92,19 @@ class StandbyClientSyncExecution {
return store.getReader().readNode(id);
}
- private boolean compareAgainstBaseState(SegmentNodeState current,
SegmentNodeState before, SegmentNodeBuilder builder) throws Exception {
+ private void compareAgainstBaseState(StandbyClient client,
SegmentNodeState current, SegmentNodeState before, SegmentNodeBuilder builder)
throws Exception {
while (true) {
try {
- return current.compareAgainstBaseState(before, new
StandbyDiff(builder, store, client, running));
+ current.compareAgainstBaseState(before, new
StandbyDiff(builder, store, client, running));
+ return;
} catch (SegmentNotFoundException e) {
log.debug("Found missing segment {}", e.getSegmentId());
-
copySegmentHierarchyFromPrimary(UUID.fromString(e.getSegmentId()));
+ copySegmentHierarchyFromPrimary(client,
UUID.fromString(e.getSegmentId()));
}
}
}
- private void copySegmentHierarchyFromPrimary(UUID segmentId) throws
Exception {
+ private void copySegmentHierarchyFromPrimary(StandbyClient client, UUID
segmentId) throws Exception {
LinkedList<UUID> batch = new LinkedList<>();
batch.offer(segmentId);
@@ -137,7 +135,7 @@ class StandbyClientSyncExecution {
continue;
}
- for (String s : readReferences(current)) {
+ for (String s : readReferences(client, current)) {
UUID referenced = UUID.fromString(s);
// Short circuit for the "backward reference". The segment
graph
@@ -189,17 +187,16 @@ class StandbyClientSyncExecution {
for (UUID id : bulk) {
log.info("Copying bulk segment {} from primary", id);
- copySegmentFromPrimary(id);
+ copySegmentFromPrimary(client, id);
}
for (UUID id : data) {
log.info("Copying data segment {} from primary", id);
- copySegmentFromPrimary(id);
+ copySegmentFromPrimary(client, id);
}
-
}
- private Iterable<String> readReferences(UUID id) throws
InterruptedException {
+ private Iterable<String> readReferences(StandbyClient client, UUID id)
throws InterruptedException {
Iterable<String> references = client.getReferences(id.toString());
if (references == null) {
@@ -216,7 +213,7 @@ class StandbyClientSyncExecution {
));
}
- private void copySegmentFromPrimary(UUID uuid) throws Exception {
+ private void copySegmentFromPrimary(StandbyClient client, UUID uuid)
throws Exception {
byte[] data = client.getSegment(uuid.toString());
if (data == null) {