Hi Oak Devs,

I've created OAK-4712 and submitted a patch for the same.  I've attached
the same patch to this email.

The submission is to add a new MBean, S3DataStoreStats, which will allow
reporting via JMX about the state of the S3DataStore.  Two metrics are
intended.  The first is to report the number of files that are in the sync
state, meaning they have been added to the S3DataStore but are not yet
completely copied into S3.  The second is to allow to query the sync state
of a file - a return value of true would mean that the file provided is
fully synchronized.  This uses an external file name, not the internal ID.

I have some questions about the implementation first:
1. For the request to query the sync state of a file, should the file name
provided be a full path to a local file or a path to a file within OAK
(e.g. /content/dam/myimage.jpg)?  Current implementation uses a local file
path but I have been wondering if it should be an OAK path.
2. For the request to query the sync state of a file, when converting from
the externally-supplied file name to an internal DataIdentifier, this
implementation is performing the same calculation to determine the internal
ID name as is done when a file is stored.  I have a number of concerns with
this:
   - It is inefficient - the entire file has to be read and digested in
order to compute the internal ID.  This takes a long time for large assets.
   - I've essentially duplicated the logic from CachingDataStore into
S3DataStore to compute the internal ID.  I hate duplicating the code, but I
am trying to avoid exposing internal IDs in API, and I am not seeing a good
way in the current implementation to avoid this without either modifying
public API to CachingDataStore, or exposing the internal ID via API, or
both.

Any suggestions on these two issues?


I'm also experiencing a problem with this patch.  In my testing it appears
to work fine, until I delete a file.  For example, if I delete an asset via
the REST API, I will see the asset deleted in CRXDE.  However, the file
still remains in S3.  This MBean as implemented only knows how to check
with S3DataStore and the corresponding backend, and these all appear to
believe the file still exists.  So the MBean continues to report that the
file's sync state is synchronized (i.e. isFileSynced() returns true) even
though the file has been removed from the JCR.  Any suggestions on how to
resolve this?


Finally, any feedback on the patch is welcome.  And if I did the process
wrong please correct me (gently) - first time submission here.  Thanks
diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml
index 800f716..ad3cb3c 100644
--- a/oak-blob-cloud/pom.xml
+++ b/oak-blob-cloud/pom.xml
@@ -41,7 +41,7 @@
                 <artifactId>maven-bundle-plugin</artifactId>
                 <configuration>
                     <instructions>
-                        
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3</Export-Package>
+                        
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats</Export-Package>
                         <DynamicImport-Package>sun.io</DynamicImport-Package>
                     </instructions>
                 </configuration>
@@ -101,6 +101,13 @@
             <version>${jackrabbit.version}</version>
         </dependency>
 
+        <!-- Dependencies to other Oak components -->
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-commons</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <!-- Amazon AWS dependency -->
         <dependency>
             <groupId>com.amazonaws</groupId>
@@ -140,6 +147,12 @@
             <artifactId>logback-classic</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
index fc21bf6..be2ed54 100644
--- 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
@@ -16,17 +16,64 @@
  */
 package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
 
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
 import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * An Amazon S3 data store.
  */
 public class S3DataStore extends CachingDataStore {
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = 
LoggerFactory.getLogger(S3DataStore.class);
+
     protected Properties properties;
 
+    private final LoadingCache<String, DataIdentifier> identifierCache =
+            CacheBuilder.newBuilder()
+                    .maximumSize(8192).expireAfterAccess(5, TimeUnit.MINUTES)
+            .build(new CacheLoader<String, DataIdentifier>() {
+                @Override
+                public DataIdentifier load(final String filename) throws 
Exception {
+                    try {
+                        MessageDigest digest = 
MessageDigest.getInstance("SHA-1");
+                        OutputStream output = new DigestOutputStream(new 
NullOutputStream(), digest);
+                        try {
+                            IOUtils.copyLarge(new FileInputStream(filename), 
output);
+                        }
+                        finally {
+                            output.close();
+                        }
+                        return new 
DataIdentifier(encodeHexString(digest.digest()));
+                    }
+                    catch (NoSuchAlgorithmException e) {
+                        LOG.warn("Unable to load digest algorithm \"SHA-1\"");
+                    }
+                    return null;
+                }
+            });
+
     @Override
     protected Backend createBackend() {
         S3Backend backend = new S3Backend();
@@ -47,4 +94,22 @@ public class S3DataStore extends CachingDataStore {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
+
+    public boolean getRecordIfStored(final String filename) {
+        try {
+            final DataIdentifier identifier = identifierCache.get(filename);
+            if (null != identifier) {
+                return this.getBackend().exists(identifier);
+            } else {
+                LOG.warn(String.format("Unable to obtain identifier for 
filename %s", filename));
+            }
+        }
+        catch (ExecutionException e) {
+            LOG.warn(String.format("Exception caught checking for %s in 
pending uploads", filename), e);
+        }
+        catch (DataStoreException e) {
+            LOG.warn(String.format("Data Store Exception caught checking for 
%s in pending uploads", filename), e);
+        }
+        return false;
+    }
 }
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
new file mode 100644
index 0000000..5356607
--- /dev/null
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+
+public class S3DataStoreStats extends AnnotatedStandardMBean implements 
S3DataStoreStatsMBean {
+    private final S3DataStore s3ds;
+
+    public S3DataStoreStats(final S3DataStore s3ds) {
+        super(S3DataStoreStatsMBean.class);
+        this.s3ds = s3ds;
+    }
+
+    @Override
+    public long getActiveSyncs() {
+        return s3ds.getPendingUploads().size();
+    }
+
+    @Override
+    public boolean isFileSynced(final String filename) {
+        if (Strings.isNullOrEmpty(filename)) {
+            return false;
+        }
+        return s3ds.getRecordIfStored(filename);
+    }
+}
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
new file mode 100644
index 0000000..79bd14b
--- /dev/null
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+
+public interface S3DataStoreStatsMBean {
+    String TYPE = "S3DataStoreStats";
+    long getActiveSyncs();
+    boolean isFileSynced(final String filename);
+}
diff --git 
a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
 
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
new file mode 100644
index 0000000..ed7d663
--- /dev/null
+++ 
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+import com.amazonaws.util.StringInputStream;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Constants;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import javax.jcr.RepositoryException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerFactory;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Properties;
+import java.util.Set;
+
+public class TestS3DataStoreStats {
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    private static Properties properties;
+    private static MBeanServer jmxServer;
+    private static ObjectName mBeanName;
+
+    private S3DataStoreStatsMBean mBean;
+    private File syncfile1;
+
+    @BeforeClass
+    public static void preClass() throws IOException, RepositoryException, 
MalformedObjectNameException {
+
+        // This will cause all tests in this file to be ignored if JMX 
properties
+        // are not passed into the test execution.
+        //
+        // If you want to run this unit test suite you will need to
+        // pass the following settings into the command-line.
+        // Example:
+        //   -Djava.rmi.server.hostname=localhost
+        //   -Dcom.sun.management.jmxremote.port=9999
+        //   -Dcom.sun.management.jmxremote.ssl=false
+        //   -Dcom.sun.management.jmxremote.authenticate=false
+        for (final String property : 
Lists.newArrayList("java.rmi.server.hostname",
+                "com.sun.management.jmxremote.port",
+                "com.sun.management.jmxremote.ssl",
+                "com.sun.management.jmxremote.authenticate")) {
+            assumeFalse(Strings.isNullOrEmpty(System.getProperty(property)));
+        }
+
+        // This will cause all tests in this file to be ignored if no JMX
+        // server could be found.
+        jmxServer = ManagementFactory.getPlatformMBeanServer();
+        if (null == jmxServer) {
+            jmxServer = MBeanServerFactory.newMBeanServer();
+        }
+        assumeNotNull(jmxServer);
+
+        // This will cause all tests in this file to be ignored if no S3
+        // configuration has been provided.
+        //
+        // If you want to run this unit test suite you will need to
+        // pass the following setting into the command-line.
+        // Example:
+        //   -Dconfig=/path/to/aws/properties
+        //
+        // Properties file uses the same format as for S3DataStore 
configuration.
+        assumeFalse(Strings.isNullOrEmpty(System.getProperty("config")));
+
+        properties = Utils.readConfig(System.getProperty("config"));
+
+        mBeanName = new 
ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats");
+    }
+
+    @Before
+    public void setup() throws IOException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        // Set up JMX connection and mbean
+        final JMXServiceURL url = new 
JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi");
+        final JMXConnector connector = JMXConnectorFactory.connect(url, null);
+        final MBeanServerConnection connection = 
connector.getMBeanServerConnection();
+        mBean = JMX.newMBeanProxy(connection, mBeanName, 
S3DataStoreStatsMBean.class, true);
+
+        syncfile1 = new File(getClass().getResource("/syncfile1").getFile());
+    }
+
+    @After
+    public void teardown() throws InstanceNotFoundException, 
MBeanRegistrationException {
+        jmxServer.unregisterMBean(mBeanName);
+    }
+
+    private S3Backend getMockS3Backend() throws DataStoreException {
+        S3Backend backend = mock(S3Backend.class, Mockito.CALLS_REAL_METHODS);
+        doNothing().when(backend).writeAsync(any(DataIdentifier.class), 
any(File.class), any(AsyncUploadCallback.class));
+        doNothing().when(backend).write(any(DataIdentifier.class), 
any(File.class));
+        return backend;
+    }
+
+    private void setupTestS3DS(final S3DataStore s3ds) throws IOException, 
RepositoryException {
+        s3ds.setProperties(properties);
+        s3ds.setSecret((String) properties.get(S3Constants.SECRET_KEY));
+        s3ds.init(folder.newFolder().getAbsolutePath());
+    }
+
+    private S3DataStore getDefaultS3DS() throws IOException, 
RepositoryException {
+        final S3DataStore s3ds = new S3DataStore();
+        setupTestS3DS(s3ds);
+        return s3ds;
+    }
+
+    private S3DataStore getCustomBackendS3DS(final S3Backend backend) throws 
IOException, RepositoryException {
+        final S3DataStore s3ds = new CustomBackendS3DataStore(backend);
+        setupTestS3DS(s3ds);
+        return s3ds;
+    }
+
+    @Test
+    public void testGetActiveS3FileSyncMetricExists() throws 
RepositoryException, IOException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testGetSingleActiveS3FileSyncMetric() throws IOException, 
RepositoryException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        DataRecord record = null;
+        try {
+            record = s3ds.addRecord(new StringInputStream("test"));
+            assert(1 == mBean.getActiveSyncs());
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+        }
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, 
RepositoryException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        final Set<DataRecord> records = Sets.newHashSet();
+        try {
+            records.add(s3ds.addRecord(new StringInputStream("test1")));
+            records.add(s3ds.addRecord(new StringInputStream("test2")));
+            records.add(s3ds.addRecord(new StringInputStream("test3")));
+
+            assert (3 == mBean.getActiveSyncs());
+        }
+        finally {
+            for (final DataRecord record : records) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+        }
+
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testIsFileSyncedMetricExists() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+    }
+
+    @Test
+    public void testIsFileSyncedNullFileReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(null));
+    }
+
+    @Test
+    public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(""));
+    }
+
+    @Test
+    public void testIsFileSyncedInvalidFilenameReturnsFalse()  throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced("invalid"));
+    }
+
+    @Test
+    public void testIsFileSyncedFileNotAddedReturnsFalse()  throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+    }
+
+    @Test
+    public void testIsFileSyncedSyncIncompleteReturnsFalse() throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+    }
+
+    @Test
+    public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            int tries = 0;
+            while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e) { }
+            }
+
+            assert(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+    }
+
+    @Test
+    public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            int tries = 0;
+            while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e) { }
+            }
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+
+        assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+    }
+
+    private class CustomBackendS3DataStore extends S3DataStore {
+        private S3Backend _localBackend;
+        CustomBackendS3DataStore(final S3Backend backend) { _localBackend = 
backend; }
+        @Override
+        protected Backend createBackend() {
+            if(properties != null){
+                _localBackend.setProperties(properties);
+            }
+            return _localBackend;
+        }
+    }
+}
diff --git a/oak-blob-cloud/src/test/resources/syncfile1 
b/oak-blob-cloud/src/test/resources/syncfile1
new file mode 100644
index 0000000..aa9d41a
--- /dev/null
+++ b/oak-blob-cloud/src/test/resources/syncfile1
@@ -0,0 +1,2 @@
+This is syncfile1.
+It is used by 
org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.TestS3DataStoreStats.java.
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
index 0c2035f..e14afc8 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
@@ -60,6 +60,9 @@ import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
 import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -71,6 +74,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
 import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
@@ -787,6 +791,19 @@ public class DocumentNodeStoreService {
                     BlobGCMBean.TYPE, "Document node store blob garbage 
collection"));
         }
 
+        if (null != store.getBlobStore() && store.getBlobStore() instanceof 
DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = 
(DataStoreBlobStore)store.getBlobStore();
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+                registrations.add(registerMBean(whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()));
+            }
+        }
+
         RevisionGC revisionGC = new RevisionGC(new Runnable() {
             @Override
             public void run() {
diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml
index bff7015..c1fa16f 100644
--- a/oak-segment-tar/pom.xml
+++ b/oak-segment-tar/pom.xml
@@ -103,6 +103,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-blob-cloud</artifactId>
+            <version>${oak.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
             <artifactId>oak-core</artifactId>
             <version>${oak.version}</version>
             <scope>provided</scope>
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
index 84c2521..aa9e956 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
@@ -56,6 +56,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
@@ -64,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -509,6 +513,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
                 )
         )));
 
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+                registrations.add(registerMBean(
+                        whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()
+                ));
+            }
+        }
+
         // Register a factory service to expose the FileStore
 
         providerRegistration = 
context.getBundleContext().registerService(SegmentStoreProvider.class.getName(),
 this, null);
diff --git 
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
index 11abf49..7f998ee 100644
--- 
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
+++ 
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
@@ -59,6 +59,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -70,6 +73,7 @@ import 
org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -289,6 +293,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore
     private Registration stringCacheMBean;
     private Registration fsgcMonitorMBean;
     private Registration fileStoreStatsMBean;
+    private Registration s3DataStoreStatsRegistration;
     private WhiteboardExecutor executor;
     private boolean customBlobStore;
 
@@ -536,6 +541,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
                 scheduleWithFixedDelay(whiteboard, fsgcm, 1)
         );
 
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+                s3DataStoreStatsRegistration = registerMBean(
+                        whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()
+                );
+            }
+        }
+
         // Register a factory service to expose the FileStore
 
         providerRegistration = 
context.getBundleContext().registerService(SegmentStoreProvider.class.getName(),
 this, null);
@@ -714,6 +736,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore
             fileStoreStatsMBean.unregister();
             fileStoreStatsMBean = null;
         }
+        if (s3DataStoreStatsRegistration != null) {
+            s3DataStoreStatsRegistration.unregister();
+            s3DataStoreStatsRegistration = null;
+        }
         if (executor != null) {
             executor.stop();
             executor = null;

Reply via email to