Use this patch instead; updated patch from latest in trunk.

On Fri, Aug 26, 2016 at 4:41 PM, Matt Ryan <o...@mvryan.org> wrote:

> Hi Oak Devs,
>
> I've created OAK-4712 and submitted a patch for the same.  I've attached
> the same patch to this email.
>
> The submission is to add a new MBean, S3DataStoreStats, which will allow
> reporting via JMX about the state of the S3DataStore.  Two metrics are
> intended.  The first is to report the number of files that are in the sync
> state, meaning they have been added to the S3DataStore but are not yet
> completely copied into S3.  The second is to allow to query the sync state
> of a file - a return value of true would mean that the file provided is
> fully synchronized.  This uses an external file name, not the internal ID.
>
> I have some questions about the implementation first:
> 1. For the request to query the sync state of a file, should the file name
> provided be a full path to a local file or a path to a file within OAK
> (e.g. /content/dam/myimage.jpg)?  Current implementation uses a local file
> path but I have been wondering if it should be an OAK path.
> 2. For the request to query the sync state of a file, when converting from
> the externally-supplied file name to an internal DataIdentifier, this
> implementation is performing the same calculation to determine the internal
> ID name as is done when a file is stored.  I have a number of concerns with
> this:
>    - It is inefficient - the entire file has to be read and digested in
> order to compute the internal ID.  This takes a long time for large assets.
>    - I've essentially duplicated the logic from CachingDataStore into
> S3DataStore to compute the internal ID.  I hate duplicating the code, but I
> am trying to avoid exposing internal IDs in API, and I am not seeing a good
> way in the current implementation to avoid this without either modifying
> public API to CachingDataStore, or exposing the internal ID via API, or
> both.
>
> Any suggestions on these two issues?
>
>
> I'm also experiencing a problem with this patch.  In my testing it appears
> to work fine, until I delete a file.  For example, if I delete an asset via
> the REST API, I will see the asset deleted in CRXDE.  However, the file
> still remains in S3.  This MBean as implemented only knows how to check
> with S3DataStore and the corresponding backend, and these all appear to
> believe the file still exists.  So the MBean continues to report that the
> file's sync state is synchronized (i.e. isFileSynced() returns true) even
> though the file has been removed from the JCR.  Any suggestions on how to
> resolve this?
>
>
> Finally, any feedback on the patch is welcome.  And if I did the process
> wrong please correct me (gently) - first time submission here.  Thanks
>
>
diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml
index 800f716..ad3cb3c 100644
--- a/oak-blob-cloud/pom.xml
+++ b/oak-blob-cloud/pom.xml
@@ -41,7 +41,7 @@
                 <artifactId>maven-bundle-plugin</artifactId>
                 <configuration>
                     <instructions>
-                        
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3</Export-Package>
+                        
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats</Export-Package>
                         <DynamicImport-Package>sun.io</DynamicImport-Package>
                     </instructions>
                 </configuration>
@@ -101,6 +101,13 @@
             <version>${jackrabbit.version}</version>
         </dependency>
 
+        <!-- Dependencies to other Oak components -->
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-commons</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <!-- Amazon AWS dependency -->
         <dependency>
             <groupId>com.amazonaws</groupId>
@@ -140,6 +147,12 @@
             <artifactId>logback-classic</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
index fc21bf6..be2ed54 100644
--- 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
@@ -16,17 +16,64 @@
  */
 package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
 
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
 import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * An Amazon S3 data store.
  */
 public class S3DataStore extends CachingDataStore {
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = 
LoggerFactory.getLogger(S3DataStore.class);
+
     protected Properties properties;
 
+    private final LoadingCache<String, DataIdentifier> identifierCache =
+            CacheBuilder.newBuilder()
+                    .maximumSize(8192).expireAfterAccess(5, TimeUnit.MINUTES)
+            .build(new CacheLoader<String, DataIdentifier>() {
+                @Override
+                public DataIdentifier load(final String filename) throws 
Exception {
+                    try {
+                        MessageDigest digest = 
MessageDigest.getInstance("SHA-1");
+                        OutputStream output = new DigestOutputStream(new 
NullOutputStream(), digest);
+                        try {
+                            IOUtils.copyLarge(new FileInputStream(filename), 
output);
+                        }
+                        finally {
+                            output.close();
+                        }
+                        return new 
DataIdentifier(encodeHexString(digest.digest()));
+                    }
+                    catch (NoSuchAlgorithmException e) {
+                        LOG.warn("Unable to load digest algorithm \"SHA-1\"");
+                    }
+                    return null;
+                }
+            });
+
     @Override
     protected Backend createBackend() {
         S3Backend backend = new S3Backend();
@@ -47,4 +94,22 @@ public class S3DataStore extends CachingDataStore {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
+
+    public boolean getRecordIfStored(final String filename) {
+        try {
+            final DataIdentifier identifier = identifierCache.get(filename);
+            if (null != identifier) {
+                return this.getBackend().exists(identifier);
+            } else {
+                LOG.warn(String.format("Unable to obtain identifier for 
filename %s", filename));
+            }
+        }
+        catch (ExecutionException e) {
+            LOG.warn(String.format("Exception caught checking for %s in 
pending uploads", filename), e);
+        }
+        catch (DataStoreException e) {
+            LOG.warn(String.format("Data Store Exception caught checking for 
%s in pending uploads", filename), e);
+        }
+        return false;
+    }
 }
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
new file mode 100644
index 0000000..5356607
--- /dev/null
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+
+public class S3DataStoreStats extends AnnotatedStandardMBean implements 
S3DataStoreStatsMBean {
+    private final S3DataStore s3ds;
+
+    public S3DataStoreStats(final S3DataStore s3ds) {
+        super(S3DataStoreStatsMBean.class);
+        this.s3ds = s3ds;
+    }
+
+    @Override
+    public long getActiveSyncs() {
+        return s3ds.getPendingUploads().size();
+    }
+
+    @Override
+    public boolean isFileSynced(final String filename) {
+        if (Strings.isNullOrEmpty(filename)) {
+            return false;
+        }
+        return s3ds.getRecordIfStored(filename);
+    }
+}
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
new file mode 100644
index 0000000..79bd14b
--- /dev/null
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+
+public interface S3DataStoreStatsMBean {
+    String TYPE = "S3DataStoreStats";
+    long getActiveSyncs();
+    boolean isFileSynced(final String filename);
+}
diff --git 
a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
 
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
new file mode 100644
index 0000000..ed7d663
--- /dev/null
+++ 
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+import com.amazonaws.util.StringInputStream;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Constants;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import javax.jcr.RepositoryException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerFactory;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Properties;
+import java.util.Set;
+
+public class TestS3DataStoreStats {
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    private static Properties properties;
+    private static MBeanServer jmxServer;
+    private static ObjectName mBeanName;
+
+    private S3DataStoreStatsMBean mBean;
+    private File syncfile1;
+
+    @BeforeClass
+    public static void preClass() throws IOException, RepositoryException, 
MalformedObjectNameException {
+
+        // This will cause all tests in this file to be ignored if JMX 
properties
+        // are not passed into the test execution.
+        //
+        // If you want to run this unit test suite you will need to
+        // pass the following settings into the command-line.
+        // Example:
+        //   -Djava.rmi.server.hostname=localhost
+        //   -Dcom.sun.management.jmxremote.port=9999
+        //   -Dcom.sun.management.jmxremote.ssl=false
+        //   -Dcom.sun.management.jmxremote.authenticate=false
+        for (final String property : 
Lists.newArrayList("java.rmi.server.hostname",
+                "com.sun.management.jmxremote.port",
+                "com.sun.management.jmxremote.ssl",
+                "com.sun.management.jmxremote.authenticate")) {
+            assumeFalse(Strings.isNullOrEmpty(System.getProperty(property)));
+        }
+
+        // This will cause all tests in this file to be ignored if no JMX
+        // server could be found.
+        jmxServer = ManagementFactory.getPlatformMBeanServer();
+        if (null == jmxServer) {
+            jmxServer = MBeanServerFactory.newMBeanServer();
+        }
+        assumeNotNull(jmxServer);
+
+        // This will cause all tests in this file to be ignored if no S3
+        // configuration has been provided.
+        //
+        // If you want to run this unit test suite you will need to
+        // pass the following setting into the command-line.
+        // Example:
+        //   -Dconfig=/path/to/aws/properties
+        //
+        // Properties file uses the same format as for S3DataStore 
configuration.
+        assumeFalse(Strings.isNullOrEmpty(System.getProperty("config")));
+
+        properties = Utils.readConfig(System.getProperty("config"));
+
+        mBeanName = new 
ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats");
+    }
+
+    @Before
+    public void setup() throws IOException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        // Set up JMX connection and mbean
+        final JMXServiceURL url = new 
JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi");
+        final JMXConnector connector = JMXConnectorFactory.connect(url, null);
+        final MBeanServerConnection connection = 
connector.getMBeanServerConnection();
+        mBean = JMX.newMBeanProxy(connection, mBeanName, 
S3DataStoreStatsMBean.class, true);
+
+        syncfile1 = new File(getClass().getResource("/syncfile1").getFile());
+    }
+
+    @After
+    public void teardown() throws InstanceNotFoundException, 
MBeanRegistrationException {
+        jmxServer.unregisterMBean(mBeanName);
+    }
+
+    private S3Backend getMockS3Backend() throws DataStoreException {
+        S3Backend backend = mock(S3Backend.class, Mockito.CALLS_REAL_METHODS);
+        doNothing().when(backend).writeAsync(any(DataIdentifier.class), 
any(File.class), any(AsyncUploadCallback.class));
+        doNothing().when(backend).write(any(DataIdentifier.class), 
any(File.class));
+        return backend;
+    }
+
+    private void setupTestS3DS(final S3DataStore s3ds) throws IOException, 
RepositoryException {
+        s3ds.setProperties(properties);
+        s3ds.setSecret((String) properties.get(S3Constants.SECRET_KEY));
+        s3ds.init(folder.newFolder().getAbsolutePath());
+    }
+
+    private S3DataStore getDefaultS3DS() throws IOException, 
RepositoryException {
+        final S3DataStore s3ds = new S3DataStore();
+        setupTestS3DS(s3ds);
+        return s3ds;
+    }
+
+    private S3DataStore getCustomBackendS3DS(final S3Backend backend) throws 
IOException, RepositoryException {
+        final S3DataStore s3ds = new CustomBackendS3DataStore(backend);
+        setupTestS3DS(s3ds);
+        return s3ds;
+    }
+
+    @Test
+    public void testGetActiveS3FileSyncMetricExists() throws 
RepositoryException, IOException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testGetSingleActiveS3FileSyncMetric() throws IOException, 
RepositoryException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        DataRecord record = null;
+        try {
+            record = s3ds.addRecord(new StringInputStream("test"));
+            assert(1 == mBean.getActiveSyncs());
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+        }
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, 
RepositoryException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        final Set<DataRecord> records = Sets.newHashSet();
+        try {
+            records.add(s3ds.addRecord(new StringInputStream("test1")));
+            records.add(s3ds.addRecord(new StringInputStream("test2")));
+            records.add(s3ds.addRecord(new StringInputStream("test3")));
+
+            assert (3 == mBean.getActiveSyncs());
+        }
+        finally {
+            for (final DataRecord record : records) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+        }
+
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testIsFileSyncedMetricExists() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+    }
+
+    @Test
+    public void testIsFileSyncedNullFileReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(null));
+    }
+
+    @Test
+    public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(""));
+    }
+
+    @Test
+    public void testIsFileSyncedInvalidFilenameReturnsFalse()  throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced("invalid"));
+    }
+
+    @Test
+    public void testIsFileSyncedFileNotAddedReturnsFalse()  throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+    }
+
+    @Test
+    public void testIsFileSyncedSyncIncompleteReturnsFalse() throws 
IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+    }
+
+    @Test
+    public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            int tries = 0;
+            while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e) { }
+            }
+
+            assert(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+    }
+
+    @Test
+    public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, 
RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            int tries = 0;
+            while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e) { }
+            }
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+
+        assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+    }
+
+    private class CustomBackendS3DataStore extends S3DataStore {
+        private S3Backend _localBackend;
+        CustomBackendS3DataStore(final S3Backend backend) { _localBackend = 
backend; }
+        @Override
+        protected Backend createBackend() {
+            if(properties != null){
+                _localBackend.setProperties(properties);
+            }
+            return _localBackend;
+        }
+    }
+}
diff --git a/oak-blob-cloud/src/test/resources/syncfile1 
b/oak-blob-cloud/src/test/resources/syncfile1
new file mode 100644
index 0000000..aa9d41a
--- /dev/null
+++ b/oak-blob-cloud/src/test/resources/syncfile1
@@ -0,0 +1,2 @@
+This is syncfile1.
+It is used by 
org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.TestS3DataStoreStats.java.
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
index 0c2035f..e14afc8 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
@@ -60,6 +60,9 @@ import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
 import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -71,6 +74,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
 import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
@@ -787,6 +791,19 @@ public class DocumentNodeStoreService {
                     BlobGCMBean.TYPE, "Document node store blob garbage 
collection"));
         }
 
+        if (null != store.getBlobStore() && store.getBlobStore() instanceof 
DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = 
(DataStoreBlobStore)store.getBlobStore();
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+                registrations.add(registerMBean(whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()));
+            }
+        }
+
         RevisionGC revisionGC = new RevisionGC(new Runnable() {
             @Override
             public void run() {
diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml
index 617462e..acd2f08 100644
--- a/oak-segment-tar/pom.xml
+++ b/oak-segment-tar/pom.xml
@@ -177,6 +177,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-blob-cloud</artifactId>
+            <version>${oak.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
             <artifactId>oak-core</artifactId>
             <version>${oak.version}</version>
             <scope>provided</scope>
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
index 84c2521..aa9e956 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
@@ -56,6 +56,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
@@ -64,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -509,6 +513,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
                 )
         )));
 
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+                registrations.add(registerMBean(
+                        whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()
+                ));
+            }
+        }
+
         // Register a factory service to expose the FileStore
 
         providerRegistration = 
context.getBundleContext().registerService(SegmentStoreProvider.class.getName(),
 this, null);
diff --git 
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
index 11abf49..7f998ee 100644
--- 
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
+++ 
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
@@ -59,6 +59,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -70,6 +73,7 @@ import 
org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -289,6 +293,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore
     private Registration stringCacheMBean;
     private Registration fsgcMonitorMBean;
     private Registration fileStoreStatsMBean;
+    private Registration s3DataStoreStatsRegistration;
     private WhiteboardExecutor executor;
     private boolean customBlobStore;
 
@@ -536,6 +541,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
                 scheduleWithFixedDelay(whiteboard, fsgcm, 1)
         );
 
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof 
S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+                s3DataStoreStatsRegistration = registerMBean(
+                        whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()
+                );
+            }
+        }
+
         // Register a factory service to expose the FileStore
 
         providerRegistration = 
context.getBundleContext().registerService(SegmentStoreProvider.class.getName(),
 this, null);
@@ -714,6 +736,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore
             fileStoreStatsMBean.unregister();
             fileStoreStatsMBean = null;
         }
+        if (s3DataStoreStatsRegistration != null) {
+            s3DataStoreStatsRegistration.unregister();
+            s3DataStoreStatsRegistration = null;
+        }
         if (executor != null) {
             executor.stop();
             executor = null;

Reply via email to