New patch attached to address feedback in OAK-4712 on the last patch.

On Thu, Sep 1, 2016 at 11:00 AM, Matt Ryan <[email protected]> wrote:

> This patch I believe addresses the issues identified with the previous
> patch.  I've also uploaded it to OAK-4712.
>
> Looking for review and feedback.
>
> This patch introduces a new interface I've named NodeIdMapper.java.  Not
> sure where to put this.  I've added it in order to be able to obtain the
> blob ID for a named node within the context of a NodeStoreService class by
> using an anonymous class.  However, most of the functionality in each case
> (DocumentNodeStoreService and SegmentNodeStoreService) is similar.  This
> could probably be addressed with a lot less code duplication by using an
> abstract base class, but since I'm not sure where would be best to put such
> a class.  Ideas?
>
> I also want to call attention to how the blob ID is being used in these
> anonymous classes in the *NodeStoreService classes.  The IDs I was getting
> back had trailing information with a '#' followed by some number of
> digits.  I just split the string and discarded the last part.  Is there a
> better way to do this?
>
>
> On Sun, Aug 28, 2016 at 10:04 PM, Amit Jain <[email protected]> wrote:
>
>> Hi Matt,
>>
>> I have directly replied to your comments on the jira.
>>
>> Thanks
>> Amit
>>
>> On Sat, Aug 27, 2016 at 4:22 AM, Matt Ryan <[email protected]> wrote:
>>
>> > Use this patch instead; updated patch from latest in trunk.
>> >
>> > On Fri, Aug 26, 2016 at 4:41 PM, Matt Ryan <[email protected]> wrote:
>> >
>> >> Hi Oak Devs,
>> >>
>> >> I've created OAK-4712 and submitted a patch for the same.  I've
>> attached
>> >> the same patch to this email.
>> >>
>> >> The submission is to add a new MBean, S3DataStoreStats, which will
>> allow
>> >> reporting via JMX about the state of the S3DataStore.  Two metrics are
>> >> intended.  The first is to report the number of files that are in the
>> sync
>> >> state, meaning they have been added to the S3DataStore but are not yet
>> >> completely copied into S3.  The second is to allow to query the sync
>> state
>> >> of a file - a return value of true would mean that the file provided is
>> >> fully synchronized.  This uses an external file name, not the internal
>> ID.
>> >>
>> >> I have some questions about the implementation first:
>> >> 1. For the request to query the sync state of a file, should the file
>> >> name provided be a full path to a local file or a path to a file
>> within OAK
>> >> (e.g. /content/dam/myimage.jpg)?  Current implementation uses a local
>> file
>> >> path but I have been wondering if it should be an OAK path.
>> >> 2. For the request to query the sync state of a file, when converting
>> >> from the externally-supplied file name to an internal DataIdentifier,
>> this
>> >> implementation is performing the same calculation to determine the
>> internal
>> >> ID name as is done when a file is stored.  I have a number of concerns
>> with
>> >> this:
>> >>    - It is inefficient - the entire file has to be read and digested in
>> >> order to compute the internal ID.  This takes a long time for large
>> assets.
>> >>    - I've essentially duplicated the logic from CachingDataStore into
>> >> S3DataStore to compute the internal ID.  I hate duplicating the code,
>> but I
>> >> am trying to avoid exposing internal IDs in API, and I am not seeing a
>> good
>> >> way in the current implementation to avoid this without either
>> modifying
>> >> public API to CachingDataStore, or exposing the internal ID via API, or
>> >> both.
>> >>
>> >> Any suggestions on these two issues?
>> >>
>> >>
>> >> I'm also experiencing a problem with this patch.  In my testing it
>> >> appears to work fine, until I delete a file.  For example, if I delete
>> an
>> >> asset via the REST API, I will see the asset deleted in CRXDE.
>> However,
>> >> the file still remains in S3.  This MBean as implemented only knows
>> how to
>> >> check with S3DataStore and the corresponding backend, and these all
>> appear
>> >> to believe the file still exists.  So the MBean continues to report
>> that
>> >> the file's sync state is synchronized (i.e. isFileSynced() returns
>> true)
>> >> even though the file has been removed from the JCR.  Any suggestions
>> on how
>> >> to resolve this?
>> >>
>> >>
>> >> Finally, any feedback on the patch is welcome.  And if I did the
>> process
>> >> wrong please correct me (gently) - first time submission here.  Thanks
>> >>
>> >>
>> >
>>
>
>
diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml
index 800f716..ad3cb3c 100644
--- a/oak-blob-cloud/pom.xml
+++ b/oak-blob-cloud/pom.xml
@@ -41,7 +41,7 @@
                 <artifactId>maven-bundle-plugin</artifactId>
                 <configuration>
                     <instructions>
-                        
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3</Export-Package>
+                        
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats</Export-Package>
                         <DynamicImport-Package>sun.io</DynamicImport-Package>
                     </instructions>
                 </configuration>
@@ -101,6 +101,13 @@
             <version>${jackrabbit.version}</version>
         </dependency>
 
+        <!-- Dependencies to other Oak components -->
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-commons</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <!-- Amazon AWS dependency -->
         <dependency>
             <groupId>com.amazonaws</groupId>
@@ -140,6 +147,12 @@
             <artifactId>logback-classic</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
index fc21bf6..b07f1a4 100644
--- 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
@@ -17,14 +17,25 @@
 package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
 
 import java.util.Properties;
+
 import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * An Amazon S3 data store.
  */
 public class S3DataStore extends CachingDataStore {
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = 
LoggerFactory.getLogger(S3DataStore.class);
+
     protected Properties properties;
 
     @Override
@@ -47,4 +58,24 @@ public class S3DataStore extends CachingDataStore {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
+
+    /**
+     * Look in the backend for a record matching the given identifier.  
Returns true
+     * if such a record exists.
+     *
+     * @param identifier - A path-like identifier that represents the path to
+     *                   the record in question.
+     * @return true if a record for the provided identifier can be found.
+     */
+    public boolean haveRecordForIdentifier(final String identifier) {
+        try {
+            if (null != identifier) {
+                return this.getBackend().exists(new 
DataIdentifier(identifier));
+            }
+        }
+        catch (DataStoreException e) {
+            LOG.warn(String.format("Data Store Exception caught checking for 
%s in pending uploads", identifier), e);
+        }
+        return false;
+    }
 }
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
new file mode 100644
index 0000000..7247dc7
--- /dev/null
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+/**
+ * MBean for JMX statistics pertaining to an S3DataStore.
+ */
+public interface S3DataStoreStatsMBean {
+    String TYPE = "S3DataStoreStats";
+
+    /**
+     * Obtains the number of records that are in the process
+     * of being "synced", meaning they are either scheduled to
+     * be copied to S3 or are actively being copied to S3
+     * but the copy of these files has not yet completed.
+     *
+     * @return number of syncs in progress (active).
+     */
+    long getActiveSyncs();
+
+    /**
+     * Determines whether a file-like entity with the given name
+     * has been "synced" (completely copied) to S3.
+     *
+     * @param filename - OAK path to the entity to check.  This is
+     *                 an oak path, not an external file path.
+     * @return true if the file is synced to S3.
+     */
+    boolean isFileSynced(final String filename);
+}
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/api/BlobIdBlob.java 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/api/BlobIdBlob.java
new file mode 100644
index 0000000..db0dc96
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/api/BlobIdBlob.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.api;
+
+/**
+ * Represents a {@link Blob} that has a {@link String} identifier.
+ */
+public interface BlobIdBlob extends Blob {
+
+    /**
+     * Returns the {@link String} identifier for this blob.
+     *
+     * @return identifier for this blob
+     */
+    String getBlobId();
+}
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
index 4281f0e..cfde65f 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
@@ -24,13 +24,14 @@ import java.io.InputStream;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import org.apache.jackrabbit.oak.api.BlobIdBlob;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.api.Blob;
 
 /**
  * A blob implementation.
  */
-public class BlobStoreBlob implements Blob {
+public class BlobStoreBlob implements BlobIdBlob {
     
     private final BlobStore blobStore;
     private final String blobId;
@@ -70,6 +71,7 @@ public class BlobStoreBlob implements Blob {
         return blobId;
     }
 
+    @Override
     public String getBlobId() {
         return blobId;
     }
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java
new file mode 100644
index 0000000..aa5df2e
--- /dev/null
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.api.BlobIdBlob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import java.io.File;
+import java.util.List;
+
+public class S3DataStoreStats extends AnnotatedStandardMBean implements 
S3DataStoreStatsMBean {
+    private final S3DataStore s3ds;
+
+    protected NodeStore nodeStore;
+
+    public S3DataStoreStats(final S3DataStore s3ds, final NodeStore nodeStore) 
{
+        super(S3DataStoreStatsMBean.class);
+        this.s3ds = s3ds;
+        this.nodeStore = nodeStore;
+    }
+
+    /**
+     * Obtains the number of records that are in the process
+     * of being "synced", meaning they are either scheduled to
+     * be copied to S3 or are actively being copied to S3
+     * but the copy of these files has not yet completed.
+     *
+     * @return number of syncs in progress (active).
+     */
+    @Override
+    public long getActiveSyncs() {
+        return s3ds.getPendingUploads().size();
+    }
+
+    /**
+     * Determines whether a file-like entity with the given name
+     * has been "synced" (completely copied) to S3.
+     *
+     * Determination of "synced":
+     * - A nodeName of null or "" is always "not synced".
+     * - A nodeName that does not map to a valid node is always "not synced".
+     * - If the node for this nodeName does not have a "jcr:data" property,
+     * this node is always "not synced" since such a node would never be
+     * copied to S3.
+     * - If the node for this nodeName is not in the nodeStore, this node is
+     * always "not synced".
+     * - Otherwise, the state is "synced" if the corresponding blob is
+     * completely stored in S3.
+     *
+     * @param nodeName - OAK path to the entity to check.  This is
+     *                 an oak path, not an external file path.
+     * @return true if the file is synced to S3.
+     */
+    @Override
+    public boolean isFileSynced(final String nodeName) {
+        if (Strings.isNullOrEmpty(nodeName)) {
+            return false;
+        }
+
+        String blobId = null;
+        if (null != nodeStore) {
+            final List<String> allNodes = 
Lists.newArrayList(nodeName.split(File.separator));
+            final List<String> pathNodes = allNodes.subList(0, allNodes.size() 
- 1);
+            final String leafNodeName = allNodes.get(allNodes.size() - 1);
+
+            NodeState currentNode = nodeStore.getRoot();
+            for (final String pathNodeName : pathNodes) {
+                if (pathNodeName.length() > 0) {
+                    final NodeState childNode = 
currentNode.getChildNode(pathNodeName);
+                    if (!childNode.exists()) {
+                        break;
+                    }
+                    currentNode = childNode;
+                }
+            }
+            final NodeState leafNode = currentNode.getChildNode(leafNodeName);
+            if (leafNode.exists()) {
+                final PropertyState state = leafNode.getProperty("jcr:data");
+                if (null != state && state.getType() == Type.BINARY) {
+                    final BlobIdBlob blob = (BlobIdBlob) 
state.getValue(state.getType());
+                    if (null != blob) {
+                        final String fullBlobId = blob.getBlobId();
+                        if (null != fullBlobId) {
+                            blobId = 
DataStoreBlobStore.BlobId.of(fullBlobId).blobId;
+                        }
+                    }
+                }
+            }
+        }
+
+        return s3ds.haveRecordForIdentifier(blobId);
+    }
+}
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
index 6086e4b..435e711 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
@@ -60,6 +60,9 @@ import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
 import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -71,6 +74,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
 import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
@@ -796,6 +800,19 @@ public class DocumentNodeStoreService {
                     BlobGCMBean.TYPE, "Document node store blob garbage 
collection"));
         }
 
+        if (null != store.getBlobStore() && store.getBlobStore() instanceof 
DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = 
(DataStoreBlobStore)store.getBlobStore();
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, 
nodeStore);
+                registrations.add(registerMBean(whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()));
+            }
+        }
+
         RevisionGC revisionGC = new RevisionGC(new Runnable() {
             @Override
             public void run() {
diff --git 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java
 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java
new file mode 100644
index 0000000..cc0ce59
--- /dev/null
+++ 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.util.StringInputStream;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.api.BlobIdBlob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.*;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import javax.jcr.RepositoryException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerFactory;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Properties;
+import java.util.Set;
+
+public class S3DataStoreStatsTest {
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    private static Properties properties;
+    private static MBeanServer jmxServer;
+    private static ObjectName mBeanName;
+    private static File syncfile1;
+    private static NodeStore mockNodeStore;
+
+    private S3DataStoreStatsMBean mBean;
+
+    @BeforeClass
+    public static void preClass() throws IOException, RepositoryException, 
MalformedObjectNameException,
+            NoSuchAlgorithmException
+    {
+
+        // This will cause all tests in this file to be ignored if JMX 
properties
+        // are not passed into the test execution.
+        //
+        // If you want to run this unit test suite you will need to
+        // pass the following settings into the command-line.
+        // Example:
+        //   -Djava.rmi.server.hostname=localhost
+        //   -Dcom.sun.management.jmxremote.port=9999
+        //   -Dcom.sun.management.jmxremote.ssl=false
+        //   -Dcom.sun.management.jmxremote.authenticate=false
+        for (final String property : 
Lists.newArrayList("java.rmi.server.hostname",
+                "com.sun.management.jmxremote.port",
+                "com.sun.management.jmxremote.ssl",
+                "com.sun.management.jmxremote.authenticate")) {
+            assumeFalse(Strings.isNullOrEmpty(System.getProperty(property)));
+        }
+
+        // This will cause all tests in this file to be ignored if no JMX
+        // server could be found.
+        jmxServer = ManagementFactory.getPlatformMBeanServer();
+        if (null == jmxServer) {
+            jmxServer = MBeanServerFactory.newMBeanServer();
+        }
+        assumeNotNull(jmxServer);
+
+        // This will cause all tests in this file to be ignored if no S3
+        // configuration has been provided.
+        //
+        // If you want to run this unit test suite you will need to
+        // pass the following setting into the command-line.
+        // Example:
+        //   -Dconfig=/path/to/aws/properties
+        //
+        // Properties file uses the same format as for S3DataStore 
configuration.
+        assumeFalse(Strings.isNullOrEmpty(System.getProperty("config")));
+
+        properties = Utils.readConfig(System.getProperty("config"));
+
+        mBeanName = new 
ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats");
+
+        syncfile1 = new 
File(S3DataStoreStatsTest.class.getResource("/syncfile1").getFile());
+
+        final char[] HEX = "0123456789abcdef".toCharArray();
+        MessageDigest digest = MessageDigest.getInstance("SHA-1");
+        OutputStream out = new DigestOutputStream(new NullOutputStream(), 
digest);
+        IOUtils.copyLarge(new FileInputStream(syncfile1), out);
+        out.close();
+        byte[] digestBytes = digest.digest();
+        char[] buffer = new char[digestBytes.length * 2];
+        for (int i=0; i<digestBytes.length; i++) {
+            buffer[2*i] = HEX[(digestBytes[i] >> 4) & 0x0f];
+            buffer[2*i+1] = HEX[digestBytes[i] & 0x0f];
+        }
+        final String syncfile1Id = new String(buffer);
+
+        mockNodeStore = mock(NodeStore.class);
+        final NodeState mockRootState = mock(NodeState.class);
+        final NodeState mockLeafState = mock(NodeState.class);
+        final PropertyState mockLeafPropertyState = mock(PropertyState.class);
+        final BlobIdBlob mockBlob = mock(BlobStoreBlob.class);
+        when(mockNodeStore.getRoot()).thenReturn(mockRootState);
+        
when(mockRootState.getChildNode(anyString())).thenReturn(mockLeafState);
+        when(mockLeafState.exists()).thenReturn(true);
+        
when(mockLeafState.getProperty(anyString())).thenReturn(mockLeafPropertyState);
+        doReturn(Type.BINARY).when(mockLeafPropertyState).getType();
+        when(mockLeafPropertyState.getValue(Type.BINARY)).thenReturn(mockBlob);
+        when(mockBlob.getBlobId()).thenReturn(syncfile1Id);
+    }
+
+    @Before
+    public void setup() throws IOException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        // Set up JMX connection and mbean
+        final JMXServiceURL url = new 
JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi");
+        final JMXConnector connector = JMXConnectorFactory.connect(url, null);
+        final MBeanServerConnection connection = 
connector.getMBeanServerConnection();
+        mBean = JMX.newMBeanProxy(connection, mBeanName, 
S3DataStoreStatsMBean.class, true);
+    }
+
+    @After
+    public void teardown() throws InstanceNotFoundException, 
MBeanRegistrationException {
+        jmxServer.unregisterMBean(mBeanName);
+    }
+
+    private S3Backend getMockS3Backend() throws DataStoreException {
+        S3Backend backend = mock(S3Backend.class, Mockito.CALLS_REAL_METHODS);
+        doNothing().when(backend).writeAsync(any(DataIdentifier.class), 
any(File.class), any(AsyncUploadCallback.class));
+        doNothing().when(backend).write(any(DataIdentifier.class), 
any(File.class));
+        return backend;
+    }
+
+    private void setupTestS3DS(final S3DataStore s3ds) throws IOException, 
RepositoryException {
+        s3ds.setProperties(properties);
+        s3ds.setSecret((String) properties.get(S3Constants.SECRET_KEY));
+        s3ds.init(folder.newFolder().getAbsolutePath());
+    }
+
+    private S3DataStore getDefaultS3DS() throws IOException, 
RepositoryException {
+        final S3DataStore s3ds = new S3DataStore();
+        setupTestS3DS(s3ds);
+        return s3ds;
+    }
+
+    private S3DataStore getCustomBackendS3DS(final S3Backend backend) throws 
IOException, RepositoryException {
+        final S3DataStore s3ds = new CustomBackendS3DataStore(backend);
+        setupTestS3DS(s3ds);
+        return s3ds;
+    }
+
+    @Test
+    public void testGetActiveS3FileSyncMetricExists() throws 
RepositoryException, IOException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testGetSingleActiveS3FileSyncMetric() throws IOException, 
RepositoryException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        DataRecord record = null;
+        try {
+            record = s3ds.addRecord(new StringInputStream("test"));
+            assert(1 == mBean.getActiveSyncs());
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+        }
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, 
RepositoryException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        final Set<DataRecord> records = Sets.newHashSet();
+        try {
+            records.add(s3ds.addRecord(new StringInputStream("test1")));
+            records.add(s3ds.addRecord(new StringInputStream("test2")));
+            records.add(s3ds.addRecord(new StringInputStream("test3")));
+
+            assert (3 == mBean.getActiveSyncs());
+        }
+        finally {
+            for (final DataRecord record : records) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+        }
+
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testIsFileSyncedMetricExists() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(syncfile1.getName()));
+    }
+
+    @Test
+    public void testIsFileSyncedNullFileReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(null));
+    }
+
+    @Test
+    public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(""));
+    }
+
+    @Test
+    public void testIsFileSyncedInvalidFilenameReturnsFalse()  throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced("invalid"));
+    }
+
+    @Test
+    public void testIsFileSyncedFileNotAddedReturnsFalse()  throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(syncfile1.getName()));
+    }
+
+    @Test
+    public void testIsFileSyncedSyncIncompleteReturnsFalse() throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            assertFalse(mBean.isFileSynced(syncfile1.getName()));
+
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+    }
+
+    @Test
+    public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            int tries = 0;
+            while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e) { }
+            }
+
+            assert(mBean.isFileSynced(syncfile1.getName()));
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+    }
+
+    @Test
+    public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, 
mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            int tries = 0;
+            while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e) { }
+            }
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+
+        assertFalse(mBean.isFileSynced(syncfile1.getName()));
+    }
+
+    private class CustomBackendS3DataStore extends S3DataStore {
+        private S3Backend _localBackend;
+        CustomBackendS3DataStore(final S3Backend backend) { _localBackend = 
backend; }
+        @Override
+        protected Backend createBackend() {
+            if(properties != null){
+                _localBackend.setProperties(properties);
+            }
+            return _localBackend;
+        }
+    }
+}
diff --git a/oak-core/src/test/resources/syncfile1 
b/oak-core/src/test/resources/syncfile1
new file mode 100644
index 0000000..b8cf537
--- /dev/null
+++ b/oak-core/src/test/resources/syncfile1
@@ -0,0 +1,2 @@
+This is syncfile1.
+It is used by 
org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStatsTest.java.
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java
index b70ac5a..5a8d0ab 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java
@@ -34,6 +34,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.BlobIdBlob;
 import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
 import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -41,7 +42,7 @@ import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 /**
  * A BLOB (stream of bytes). This is a record of type "VALUE".
  */
-public class SegmentBlob extends Record implements Blob {
+public class SegmentBlob extends Record implements BlobIdBlob {
 
     @CheckForNull
     private final BlobStore blobStore;
@@ -156,6 +157,7 @@ public class SegmentBlob extends Record implements Blob {
         return (head & 0xf0) == 0xe0 || (head & 0xf8) == 0xf0;
     }
 
+    @Override
     @CheckForNull
     public String getBlobId() {
         return readBlobId(getSegment(), getOffset());
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
index ae609dd..adfab86 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
@@ -58,6 +58,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
@@ -66,6 +69,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -604,6 +608,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
             ));
         }
 
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, 
segmentNodeStore);
+                registrations.add(registerMBean(
+                        whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()
+                ));
+            }
+        }
+
         log.info("SegmentNodeStore initialized");
         return true;
     }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentDataStoreBlobGCIT.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentDataStoreBlobGCIT.java
index 6336ae9..0693767 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentDataStoreBlobGCIT.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentDataStoreBlobGCIT.java
@@ -472,7 +472,7 @@ public class SegmentDataStoreBlobGCIT {
         }
         
         @Override
-        protected void markAndSweep(boolean markOnly) throws Exception {
+        protected void markAndSweep(boolean markOnly, boolean 
forceBlobRetrieve) throws Exception {
             boolean threw = true;
             GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
             try {
@@ -493,7 +493,7 @@ public class SegmentDataStoreBlobGCIT {
                     Thread.sleep(maxLastModifiedInterval + 100);
                     LOG.info("Slept {} to make additional blobs old", 
maxLastModifiedInterval + 100);
                     
-                    long deleteCount = sweep(fs, markStart);
+                    long deleteCount = sweep(fs, markStart, forceBlobRetrieve);
                     threw = false;
                     
                     LOG.info("Blob garbage collection completed in {}. Number 
of blobs deleted [{}]", sw.toString(),
diff --git 
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java
 
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java
index 2d7f240..c14d950 100644
--- 
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java
+++ 
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java
@@ -32,13 +32,14 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.BlobIdBlob;
 import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 
 /**
  * A BLOB (stream of bytes). This is a record of type "VALUE".
  */
-public class SegmentBlob extends Record implements Blob {
+public class SegmentBlob extends Record implements BlobIdBlob {
 
     public static Iterable<SegmentId> getBulkSegmentIds(Blob blob) {
         if (blob instanceof SegmentBlob) {
@@ -151,6 +152,7 @@ public class SegmentBlob extends Record implements Blob {
         return (head & 0xf0) == 0xe0 || (head & 0xf8) == 0xf0;
     }
 
+    @Override
     public String getBlobId() {
         Segment segment = getSegment();
         int offset = getOffset();
diff --git 
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
index 11abf49..fb6b765 100644
--- 
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
+++ 
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
@@ -59,6 +59,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -70,6 +73,7 @@ import 
org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -289,6 +293,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore
     private Registration stringCacheMBean;
     private Registration fsgcMonitorMBean;
     private Registration fileStoreStatsMBean;
+    private Registration s3DataStoreStatsRegistration;
     private WhiteboardExecutor executor;
     private boolean customBlobStore;
 
@@ -536,6 +541,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
                 scheduleWithFixedDelay(whiteboard, fsgcm, 1)
         );
 
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, 
segmentNodeStore);
+                s3DataStoreStatsRegistration = registerMBean(
+                        whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()
+                );
+            }
+        }
+
         // Register a factory service to expose the FileStore
 
         providerRegistration = 
context.getBundleContext().registerService(SegmentStoreProvider.class.getName(),
 this, null);
@@ -714,6 +736,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore
             fileStoreStatsMBean.unregister();
             fileStoreStatsMBean = null;
         }
+        if (s3DataStoreStatsRegistration != null) {
+            s3DataStoreStatsRegistration.unregister();
+            s3DataStoreStatsRegistration = null;
+        }
         if (executor != null) {
             executor.stop();
             executor = null;

Reply via email to