Author: frm
Date: Thu Jan 17 14:41:39 2019
New Revision: 1851533
URL: http://svn.apache.org/viewvc?rev=1851533&view=rev
Log:
OAK-6749 - Extract the blob processing logic from StandbyDiff
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobFetchTimeoutException.java
(with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobProcessor.java
(with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobTypeUnknownException.java
(with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobWriteException.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobFetchTimeoutException.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobFetchTimeoutException.java?rev=1851533&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobFetchTimeoutException.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobFetchTimeoutException.java
Thu Jan 17 14:41:39 2019
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.client;
+
+public class BlobFetchTimeoutException extends RuntimeException {
+
+ private final String blobId;
+
+ BlobFetchTimeoutException(String blobId) {
+ this.blobId = blobId;
+ }
+
+ public String getBlobId() {
+ return blobId;
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobFetchTimeoutException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobProcessor.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobProcessor.java?rev=1851533&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobProcessor.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobProcessor.java
Thu Jan 17 14:41:39 2019
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.client;
+
+import org.apache.jackrabbit.oak.api.Blob;
+
+interface BlobProcessor {
+
+ void processBinary(Blob b) throws InterruptedException;
+
+}
Propchange:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobTypeUnknownException.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobTypeUnknownException.java?rev=1851533&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobTypeUnknownException.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobTypeUnknownException.java
Thu Jan 17 14:41:39 2019
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.client;
+
+public class BlobTypeUnknownException extends RuntimeException {
+
+ BlobTypeUnknownException() {
+ // Prevent instantiation outside of this package.
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobTypeUnknownException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobWriteException.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobWriteException.java?rev=1851533&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobWriteException.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobWriteException.java
Thu Jan 17 14:41:39 2019
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.client;
+
+import java.io.IOException;
+
+class BlobWriteException extends RuntimeException {
+
+ private final String blobId;
+
+ BlobWriteException(String blobId, IOException cause) {
+ super(cause);
+ this.blobId = blobId;
+ }
+
+ public String getBlobId() {
+ return blobId;
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/BlobWriteException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java?rev=1851533&r1=1851532&r2=1851533&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java
Thu Jan 17 14:41:39 2019
@@ -31,7 +31,6 @@ import org.apache.jackrabbit.oak.api.Blo
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.segment.CancelableDiff;
-import org.apache.jackrabbit.oak.segment.SegmentBlob;
import org.apache.jackrabbit.oak.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -51,24 +50,30 @@ class StandbyDiff implements NodeStateDi
private final StandbyClient client;
- private final boolean hasDataStore;
-
private final String path;
private final Supplier<Boolean> running;
+ private final BlobProcessor blobProcessor;
+
StandbyDiff(NodeBuilder builder, FileStore store, StandbyClient client,
Supplier<Boolean> running) {
this(builder, store, client, "/", running);
}
- private StandbyDiff(NodeBuilder builder, FileStore store, StandbyClient
client, String path,
- Supplier<Boolean> running) {
+ private static BlobProcessor newBinaryFetcher(BlobStore blobStore,
StandbyClient client) {
+ if (blobStore == null) {
+ return (blob) -> {};
+ }
+ return new RemoteBlobProcessor(blobStore, client::getBlob);
+ }
+
+ private StandbyDiff(NodeBuilder builder, FileStore store, StandbyClient
client, String path, Supplier<Boolean> running) {
this.builder = builder;
this.store = store;
- this.hasDataStore = store.getBlobStore() != null;
this.client = client;
this.path = path;
this.running = running;
+ this.blobProcessor = newBinaryFetcher(store.getBlobStore(), client);
}
@Override
@@ -129,7 +134,7 @@ class StandbyDiff implements NodeStateDi
return (SegmentNodeState) after;
}
- if (!hasDataStore) {
+ if (store.getBlobStore() == null) {
return (SegmentNodeState) after;
}
@@ -137,7 +142,7 @@ class StandbyDiff implements NodeStateDi
// traversal to verify binaries
for (PropertyState propertyState : after.getProperties()) {
- fetchBinary(propertyState);
+ processBinary(propertyState);
}
boolean success = after.compareAgainstBaseState(before, new
CancelableDiff(this, newCanceledSupplier()));
@@ -162,59 +167,44 @@ class StandbyDiff implements NodeStateDi
};
}
- private PropertyState fetchBinary(PropertyState property) {
+ private PropertyState processBinary(PropertyState property) {
Type<?> type = property.getType();
if (type == BINARY) {
- fetchBinary(property.getValue(Type.BINARY), property.getName());
+ processBinary(property.getValue(Type.BINARY), property.getName());
} else if (type == BINARIES) {
for (Blob blob : property.getValue(BINARIES)) {
- fetchBinary(blob, property.getName());
+ processBinary(blob, property.getName());
}
}
return property;
}
- private void fetchBinary(Blob b, String pName) {
- if (b instanceof SegmentBlob) {
- fetchBinary((SegmentBlob) b, pName);
- } else {
- log.warn("Unknown Blob {} at {}, ignoring",
b.getClass().getName(), path + "#" + pName);
- }
- }
-
- private void fetchBinary(SegmentBlob sb, String pName) {
- if (sb.isExternal() && hasDataStore && sb.getReference() == null) {
- String blobId = sb.getBlobId();
-
- if (blobId == null) {
- return;
- }
-
- try {
- fetchAndStoreBlob(blobId, pName);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private void fetchAndStoreBlob(String blobId, String pName) throws
InterruptedException {
- InputStream in = client.getBlob(blobId);
-
- if (in == null) {
- throw new IllegalStateException("Unable to load remote blob " +
blobId + " at " + path + "#" + pName
- + " in " + client.getReadTimeoutMs() + "ms. Please
increase the timeout and try again.");
- }
-
+ private void processBinary(Blob b, String propertyName) {
try {
- BlobStore blobStore = store.getBlobStore();
- assert blobStore != null : "Blob store must not be null";
- blobStore.writeBlob(in);
- in.close();
- } catch (IOException f) {
- throw new IllegalStateException("Unable to persist blob " + blobId
+ " at " + path + "#" + pName, f);
+ blobProcessor.processBinary(b);
+ } catch (BlobFetchTimeoutException e) {
+ String message = String.format(
+ "Unable to load remote blob %s at %s#%s in %dms. Please
increase the timeout and try again.",
+ e.getBlobId(),
+ path,
+ propertyName,
+ client.getReadTimeoutMs()
+ );
+ throw new IllegalStateException(message, e);
+ } catch (BlobWriteException e) {
+ String message = String.format(
+ "Unable to persist blob %s at %s#%s",
+ e.getBlobId(),
+ path,
+ propertyName
+ );
+ throw new IllegalStateException(message, e);
+ } catch (BlobTypeUnknownException e) {
+ log.warn("Unknown Blob {} at {}, ignoring",
b.getClass().getName(), path + "#" + propertyName);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}