Use this patch instead; updated patch from latest in trunk.
On Fri, Aug 26, 2016 at 4:41 PM, Matt Ryan <[email protected]> wrote:
> Hi Oak Devs,
>
> I've created OAK-4712 and submitted a patch for the same. I've attached
> the same patch to this email.
>
> The submission is to add a new MBean, S3DataStoreStats, which will allow
> reporting via JMX about the state of the S3DataStore. Two metrics are
> intended. The first is to report the number of files that are in the sync
> state, meaning they have been added to the S3DataStore but are not yet
> completely copied into S3. The second is to allow to query the sync state
> of a file - a return value of true would mean that the file provided is
> fully synchronized. This uses an external file name, not the internal ID.
>
> I have some questions about the implementation first:
> 1. For the request to query the sync state of a file, should the file name
> provided be a full path to a local file or a path to a file within OAK
> (e.g. /content/dam/myimage.jpg)? Current implementation uses a local file
> path but I have been wondering if it should be an OAK path.
> 2. For the request to query the sync state of a file, when converting from
> the externally-supplied file name to an internal DataIdentifier, this
> implementation is performing the same calculation to determine the internal
> ID name as is done when a file is stored. I have a number of concerns with
> this:
> - It is inefficient - the entire file has to be read and digested in
> order to compute the internal ID. This takes a long time for large assets.
> - I've essentially duplicated the logic from CachingDataStore into
> S3DataStore to compute the internal ID. I hate duplicating the code, but I
> am trying to avoid exposing internal IDs in API, and I am not seeing a good
> way in the current implementation to avoid this without either modifying
> public API to CachingDataStore, or exposing the internal ID via API, or
> both.
>
> Any suggestions on these two issues?
>
>
> I'm also experiencing a problem with this patch. In my testing it appears
> to work fine, until I delete a file. For example, if I delete an asset via
> the REST API, I will see the asset deleted in CRXDE. However, the file
> still remains in S3. This MBean as implemented only knows how to check
> with S3DataStore and the corresponding backend, and these all appear to
> believe the file still exists. So the MBean continues to report that the
> file's sync state is synchronized (i.e. isFileSynced() returns true) even
> though the file has been removed from the JCR. Any suggestions on how to
> resolve this?
>
>
> Finally, any feedback on the patch is welcome. And if I did the process
> wrong please correct me (gently) - first time submission here. Thanks
>
>
diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml
index 800f716..ad3cb3c 100644
--- a/oak-blob-cloud/pom.xml
+++ b/oak-blob-cloud/pom.xml
@@ -41,7 +41,7 @@
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
<instructions>
-
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3</Export-Package>
+
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats</Export-Package>
<DynamicImport-Package>sun.io</DynamicImport-Package>
</instructions>
</configuration>
@@ -101,6 +101,13 @@
<version>${jackrabbit.version}</version>
</dependency>
+ <!-- Dependencies to other Oak components -->
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-commons</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- Amazon AWS dependency -->
<dependency>
<groupId>com.amazonaws</groupId>
@@ -140,6 +147,12 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.10.19</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
index fc21bf6..be2ed54 100644
---
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
+++
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
@@ -16,17 +16,64 @@
*/
package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
import org.apache.jackrabbit.core.data.Backend;
import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An Amazon S3 data store.
*/
public class S3DataStore extends CachingDataStore {
+
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG =
LoggerFactory.getLogger(S3DataStore.class);
+
protected Properties properties;
+ private final LoadingCache<String, DataIdentifier> identifierCache =
+ CacheBuilder.newBuilder()
+ .maximumSize(8192).expireAfterAccess(5, TimeUnit.MINUTES)
+ .build(new CacheLoader<String, DataIdentifier>() {
+ @Override
+ public DataIdentifier load(final String filename) throws
Exception {
+ try {
+ MessageDigest digest =
MessageDigest.getInstance("SHA-1");
+ OutputStream output = new DigestOutputStream(new
NullOutputStream(), digest);
+ try {
+ IOUtils.copyLarge(new FileInputStream(filename),
output);
+ }
+ finally {
+ output.close();
+ }
+ return new
DataIdentifier(encodeHexString(digest.digest()));
+ }
+ catch (NoSuchAlgorithmException e) {
+ LOG.warn("Unable to load digest algorithm \"SHA-1\"");
+ }
+ return null;
+ }
+ });
+
@Override
protected Backend createBackend() {
S3Backend backend = new S3Backend();
@@ -47,4 +94,22 @@ public class S3DataStore extends CachingDataStore {
public void setProperties(Properties properties) {
this.properties = properties;
}
+
+ public boolean getRecordIfStored(final String filename) {
+ try {
+ final DataIdentifier identifier = identifierCache.get(filename);
+ if (null != identifier) {
+ return this.getBackend().exists(identifier);
+ } else {
+ LOG.warn(String.format("Unable to obtain identifier for
filename %s", filename));
+ }
+ }
+ catch (ExecutionException e) {
+ LOG.warn(String.format("Exception caught checking for %s in
pending uploads", filename), e);
+ }
+ catch (DataStoreException e) {
+ LOG.warn(String.format("Data Store Exception caught checking for
%s in pending uploads", filename), e);
+ }
+ return false;
+ }
}
diff --git
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
new file mode 100644
index 0000000..5356607
--- /dev/null
+++
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+
+public class S3DataStoreStats extends AnnotatedStandardMBean implements
S3DataStoreStatsMBean {
+ private final S3DataStore s3ds;
+
+ public S3DataStoreStats(final S3DataStore s3ds) {
+ super(S3DataStoreStatsMBean.class);
+ this.s3ds = s3ds;
+ }
+
+ @Override
+ public long getActiveSyncs() {
+ return s3ds.getPendingUploads().size();
+ }
+
+ @Override
+ public boolean isFileSynced(final String filename) {
+ if (Strings.isNullOrEmpty(filename)) {
+ return false;
+ }
+ return s3ds.getRecordIfStored(filename);
+ }
+}
diff --git
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
new file mode 100644
index 0000000..79bd14b
--- /dev/null
+++
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+
+public interface S3DataStoreStatsMBean {
+ String TYPE = "S3DataStoreStats";
+ long getActiveSyncs();
+ boolean isFileSynced(final String filename);
+}
diff --git
a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
new file mode 100644
index 0000000..ed7d663
--- /dev/null
+++
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+import com.amazonaws.util.StringInputStream;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Constants;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import javax.jcr.RepositoryException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerFactory;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Properties;
+import java.util.Set;
+
+public class TestS3DataStoreStats {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ private static Properties properties;
+ private static MBeanServer jmxServer;
+ private static ObjectName mBeanName;
+
+ private S3DataStoreStatsMBean mBean;
+ private File syncfile1;
+
+ @BeforeClass
+ public static void preClass() throws IOException, RepositoryException,
MalformedObjectNameException {
+
+ // This will cause all tests in this file to be ignored if JMX
properties
+ // are not passed into the test execution.
+ //
+ // If you want to run this unit test suite you will need to
+ // pass the following settings into the command-line.
+ // Example:
+ // -Djava.rmi.server.hostname=localhost
+ // -Dcom.sun.management.jmxremote.port=9999
+ // -Dcom.sun.management.jmxremote.ssl=false
+ // -Dcom.sun.management.jmxremote.authenticate=false
+ for (final String property :
Lists.newArrayList("java.rmi.server.hostname",
+ "com.sun.management.jmxremote.port",
+ "com.sun.management.jmxremote.ssl",
+ "com.sun.management.jmxremote.authenticate")) {
+ assumeFalse(Strings.isNullOrEmpty(System.getProperty(property)));
+ }
+
+ // This will cause all tests in this file to be ignored if no JMX
+ // server could be found.
+ jmxServer = ManagementFactory.getPlatformMBeanServer();
+ if (null == jmxServer) {
+ jmxServer = MBeanServerFactory.newMBeanServer();
+ }
+ assumeNotNull(jmxServer);
+
+ // This will cause all tests in this file to be ignored if no S3
+ // configuration has been provided.
+ //
+ // If you want to run this unit test suite you will need to
+ // pass the following setting into the command-line.
+ // Example:
+ // -Dconfig=/path/to/aws/properties
+ //
+ // Properties file uses the same format as for S3DataStore
configuration.
+ assumeFalse(Strings.isNullOrEmpty(System.getProperty("config")));
+
+ properties = Utils.readConfig(System.getProperty("config"));
+
+ mBeanName = new
ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats");
+ }
+
+ @Before
+ public void setup() throws IOException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ // Set up JMX connection and mbean
+ final JMXServiceURL url = new
JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi");
+ final JMXConnector connector = JMXConnectorFactory.connect(url, null);
+ final MBeanServerConnection connection =
connector.getMBeanServerConnection();
+ mBean = JMX.newMBeanProxy(connection, mBeanName,
S3DataStoreStatsMBean.class, true);
+
+ syncfile1 = new File(getClass().getResource("/syncfile1").getFile());
+ }
+
+ @After
+ public void teardown() throws InstanceNotFoundException,
MBeanRegistrationException {
+ jmxServer.unregisterMBean(mBeanName);
+ }
+
+ private S3Backend getMockS3Backend() throws DataStoreException {
+ S3Backend backend = mock(S3Backend.class, Mockito.CALLS_REAL_METHODS);
+ doNothing().when(backend).writeAsync(any(DataIdentifier.class),
any(File.class), any(AsyncUploadCallback.class));
+ doNothing().when(backend).write(any(DataIdentifier.class),
any(File.class));
+ return backend;
+ }
+
+ private void setupTestS3DS(final S3DataStore s3ds) throws IOException,
RepositoryException {
+ s3ds.setProperties(properties);
+ s3ds.setSecret((String) properties.get(S3Constants.SECRET_KEY));
+ s3ds.init(folder.newFolder().getAbsolutePath());
+ }
+
+ private S3DataStore getDefaultS3DS() throws IOException,
RepositoryException {
+ final S3DataStore s3ds = new S3DataStore();
+ setupTestS3DS(s3ds);
+ return s3ds;
+ }
+
+ private S3DataStore getCustomBackendS3DS(final S3Backend backend) throws
IOException, RepositoryException {
+ final S3DataStore s3ds = new CustomBackendS3DataStore(backend);
+ setupTestS3DS(s3ds);
+ return s3ds;
+ }
+
+ @Test
+ public void testGetActiveS3FileSyncMetricExists() throws
RepositoryException, IOException, MalformedObjectNameException,
+ InstanceAlreadyExistsException, MBeanRegistrationException,
NotCompliantMBeanException {
+ final S3DataStore s3ds = getDefaultS3DS();
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ assert(0 == mBean.getActiveSyncs());
+ }
+
+ @Test
+ public void testGetSingleActiveS3FileSyncMetric() throws IOException,
RepositoryException, MalformedObjectNameException,
+ InstanceAlreadyExistsException, MBeanRegistrationException,
NotCompliantMBeanException {
+ final S3Backend backend = getMockS3Backend();
+ final S3DataStore s3ds = getCustomBackendS3DS(backend);
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ DataRecord record = null;
+ try {
+ record = s3ds.addRecord(new StringInputStream("test"));
+ assert(1 == mBean.getActiveSyncs());
+ }
+ finally {
+ if (null != record) {
+ s3ds.deleteRecord(record.getIdentifier());
+ }
+ }
+ assert(0 == mBean.getActiveSyncs());
+ }
+
+ @Test
+ public void testGetMultilpleActiveS3FileSyncMetric() throws IOException,
RepositoryException, MalformedObjectNameException,
+ InstanceAlreadyExistsException, MBeanRegistrationException,
NotCompliantMBeanException {
+ final S3Backend backend = getMockS3Backend();
+ final S3DataStore s3ds = getCustomBackendS3DS(backend);
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ final Set<DataRecord> records = Sets.newHashSet();
+ try {
+ records.add(s3ds.addRecord(new StringInputStream("test1")));
+ records.add(s3ds.addRecord(new StringInputStream("test2")));
+ records.add(s3ds.addRecord(new StringInputStream("test3")));
+
+ assert (3 == mBean.getActiveSyncs());
+ }
+ finally {
+ for (final DataRecord record : records) {
+ s3ds.deleteRecord(record.getIdentifier());
+ }
+ }
+
+ assert(0 == mBean.getActiveSyncs());
+ }
+
+ @Test
+ public void testIsFileSyncedMetricExists() throws IOException,
RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ final S3DataStore s3ds = getDefaultS3DS();
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+ }
+
+ @Test
+ public void testIsFileSyncedNullFileReturnsFalse() throws IOException,
RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ final S3DataStore s3ds = getDefaultS3DS();
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ assertFalse(mBean.isFileSynced(null));
+ }
+
+ @Test
+ public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException,
RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ final S3DataStore s3ds = getDefaultS3DS();
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ assertFalse(mBean.isFileSynced(""));
+ }
+
+ @Test
+ public void testIsFileSyncedInvalidFilenameReturnsFalse() throws
IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ final S3DataStore s3ds = getDefaultS3DS();
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ assertFalse(mBean.isFileSynced("invalid"));
+ }
+
+ @Test
+ public void testIsFileSyncedFileNotAddedReturnsFalse() throws
IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ final S3Backend backend = getMockS3Backend();
+ final S3DataStore s3ds = getCustomBackendS3DS(backend);
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+ }
+
+ @Test
+ public void testIsFileSyncedSyncIncompleteReturnsFalse() throws
IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ final S3Backend backend = getMockS3Backend();
+ final S3DataStore s3ds = getCustomBackendS3DS(backend);
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ FileInputStream inputStream = null;
+ DataRecord record = null;
+ try {
+ inputStream = new FileInputStream(syncfile1);
+ record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+ assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+
+ }
+ finally {
+ if (null != record) {
+ s3ds.deleteRecord(record.getIdentifier());
+ }
+ if (null != inputStream) {
+ inputStream.close();
+ }
+ }
+ }
+
+ @Test
+ public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException,
RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ final S3DataStore s3ds = getDefaultS3DS();
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ FileInputStream inputStream = null;
+ DataRecord record = null;
+ try {
+ inputStream = new FileInputStream(syncfile1);
+
+ record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+ int tries = 0;
+ while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) { }
+ }
+
+ assert(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+ }
+ finally {
+ if (null != record) {
+ s3ds.deleteRecord(record.getIdentifier());
+ }
+ if (null != inputStream) {
+ inputStream.close();
+ }
+ }
+ }
+
+ @Test
+ public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException,
RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ final S3DataStore s3ds = getDefaultS3DS();
+ final S3DataStoreStats stats = new S3DataStoreStats(s3ds);
+ jmxServer.registerMBean(stats, mBeanName);
+
+ FileInputStream inputStream = null;
+ DataRecord record = null;
+ try {
+ inputStream = new FileInputStream(syncfile1);
+
+ record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+ int tries = 0;
+ while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) { }
+ }
+ }
+ finally {
+ if (null != record) {
+ s3ds.deleteRecord(record.getIdentifier());
+ }
+ if (null != inputStream) {
+ inputStream.close();
+ }
+ }
+
+ assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath()));
+ }
+
+ private class CustomBackendS3DataStore extends S3DataStore {
+ private S3Backend _localBackend;
+ CustomBackendS3DataStore(final S3Backend backend) { _localBackend =
backend; }
+ @Override
+ protected Backend createBackend() {
+ if(properties != null){
+ _localBackend.setProperties(properties);
+ }
+ return _localBackend;
+ }
+ }
+}
diff --git a/oak-blob-cloud/src/test/resources/syncfile1
b/oak-blob-cloud/src/test/resources/syncfile1
new file mode 100644
index 0000000..aa9d41a
--- /dev/null
+++ b/oak-blob-cloud/src/test/resources/syncfile1
@@ -0,0 +1,2 @@
+This is syncfile1.
+It is used by
org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.TestS3DataStoreStats.java.
diff --git
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
index 0c2035f..e14afc8 100644
---
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
+++
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
@@ -60,6 +60,9 @@ import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -71,6 +74,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
@@ -787,6 +791,19 @@ public class DocumentNodeStoreService {
BlobGCMBean.TYPE, "Document node store blob garbage
collection"));
}
+ if (null != store.getBlobStore() && store.getBlobStore() instanceof
DataStoreBlobStore) {
+ final DataStoreBlobStore dsbs =
(DataStoreBlobStore)store.getBlobStore();
+ if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof
S3DataStore) {
+ final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+ final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+ registrations.add(registerMBean(whiteboard,
+ S3DataStoreStatsMBean.class,
+ s3dsStats,
+ S3DataStoreStatsMBean.TYPE,
+ s3dsStats.getClass().getSimpleName()));
+ }
+ }
+
RevisionGC revisionGC = new RevisionGC(new Runnable() {
@Override
public void run() {
diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml
index 617462e..acd2f08 100644
--- a/oak-segment-tar/pom.xml
+++ b/oak-segment-tar/pom.xml
@@ -177,6 +177,12 @@
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-blob-cloud</artifactId>
+ <version>${oak.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-core</artifactId>
<version>${oak.version}</version>
<scope>provided</scope>
diff --git
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
index 84c2521..aa9e956 100644
---
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
+++
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
@@ -56,6 +56,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
@@ -64,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -509,6 +513,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
)
)));
+ // Expose statistics about S3DataStore, if one is being used
+
+ if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+ final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+ if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof
S3DataStore) {
+ final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+ final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+ registrations.add(registerMBean(
+ whiteboard,
+ S3DataStoreStatsMBean.class,
+ s3dsStats,
+ S3DataStoreStatsMBean.TYPE,
+ s3dsStats.getClass().getSimpleName()
+ ));
+ }
+ }
+
// Register a factory service to expose the FileStore
providerRegistration =
context.getBundleContext().registerService(SegmentStoreProvider.class.getName(),
this, null);
diff --git
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
index 11abf49..7f998ee 100644
---
a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
+++
b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
@@ -59,6 +59,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -70,6 +73,7 @@ import
org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -289,6 +293,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore
private Registration stringCacheMBean;
private Registration fsgcMonitorMBean;
private Registration fileStoreStatsMBean;
+ private Registration s3DataStoreStatsRegistration;
private WhiteboardExecutor executor;
private boolean customBlobStore;
@@ -536,6 +541,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
scheduleWithFixedDelay(whiteboard, fsgcm, 1)
);
+ // Expose statistics about S3DataStore, if one is being used
+
+ if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+ final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+ if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof
S3DataStore) {
+ final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+ final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds);
+ s3DataStoreStatsRegistration = registerMBean(
+ whiteboard,
+ S3DataStoreStatsMBean.class,
+ s3dsStats,
+ S3DataStoreStatsMBean.TYPE,
+ s3dsStats.getClass().getSimpleName()
+ );
+ }
+ }
+
// Register a factory service to expose the FileStore
providerRegistration =
context.getBundleContext().registerService(SegmentStoreProvider.class.getName(),
this, null);
@@ -714,6 +736,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore
fileStoreStatsMBean.unregister();
fileStoreStatsMBean = null;
}
+ if (s3DataStoreStatsRegistration != null) {
+ s3DataStoreStatsRegistration.unregister();
+ s3DataStoreStatsRegistration = null;
+ }
if (executor != null) {
executor.stop();
executor = null;