Hi Oak Devs, I've created OAK-4712 and submitted a patch for the same. I've attached the same patch to this email.
The submission is to add a new MBean, S3DataStoreStats, which will allow reporting via JMX about the state of the S3DataStore. Two metrics are intended. The first is to report the number of files that are in the sync state, meaning they have been added to the S3DataStore but are not yet completely copied into S3. The second is to allow to query the sync state of a file - a return value of true would mean that the file provided is fully synchronized. This uses an external file name, not the internal ID. I have some questions about the implementation first: 1. For the request to query the sync state of a file, should the file name provided be a full path to a local file or a path to a file within OAK (e.g. /content/dam/myimage.jpg)? Current implementation uses a local file path but I have been wondering if it should be an OAK path. 2. For the request to query the sync state of a file, when converting from the externally-supplied file name to an internal DataIdentifier, this implementation is performing the same calculation to determine the internal ID name as is done when a file is stored. I have a number of concerns with this: - It is inefficient - the entire file has to be read and digested in order to compute the internal ID. This takes a long time for large assets. - I've essentially duplicated the logic from CachingDataStore into S3DataStore to compute the internal ID. I hate duplicating the code, but I am trying to avoid exposing internal IDs in API, and I am not seeing a good way in the current implementation to avoid this without either modifying public API to CachingDataStore, or exposing the internal ID via API, or both. Any suggestions on these two issues? I'm also experiencing a problem with this patch. In my testing it appears to work fine, until I delete a file. For example, if I delete an asset via the REST API, I will see the asset deleted in CRXDE. However, the file still remains in S3. This MBean as implemented only knows how to check with S3DataStore and the corresponding backend, and these all appear to believe the file still exists. So the MBean continues to report that the file's sync state is synchronized (i.e. isFileSynced() returns true) even though the file has been removed from the JCR. Any suggestions on how to resolve this? Finally, any feedback on the patch is welcome. And if I did the process wrong please correct me (gently) - first time submission here. Thanks
diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml index 800f716..ad3cb3c 100644 --- a/oak-blob-cloud/pom.xml +++ b/oak-blob-cloud/pom.xml @@ -41,7 +41,7 @@ <artifactId>maven-bundle-plugin</artifactId> <configuration> <instructions> - <Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3</Export-Package> + <Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats</Export-Package> <DynamicImport-Package>sun.io</DynamicImport-Package> </instructions> </configuration> @@ -101,6 +101,13 @@ <version>${jackrabbit.version}</version> </dependency> + <!-- Dependencies to other Oak components --> + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-commons</artifactId> + <version>${project.version}</version> + </dependency> + <!-- Amazon AWS dependency --> <dependency> <groupId>com.amazonaws</groupId> @@ -140,6 +147,12 @@ <artifactId>logback-classic</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java index fc21bf6..be2ed54 100644 --- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java @@ -16,17 +16,64 @@ */ package org.apache.jackrabbit.oak.blob.cloud.aws.s3; +import java.io.FileInputStream; +import java.io.OutputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.NullOutputStream; import org.apache.jackrabbit.core.data.Backend; import org.apache.jackrabbit.core.data.CachingDataStore; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An Amazon S3 data store. */ public class S3DataStore extends CachingDataStore { + + /** + * Logger instance. + */ + private static final Logger LOG = LoggerFactory.getLogger(S3DataStore.class); + protected Properties properties; + private final LoadingCache<String, DataIdentifier> identifierCache = + CacheBuilder.newBuilder() + .maximumSize(8192).expireAfterAccess(5, TimeUnit.MINUTES) + .build(new CacheLoader<String, DataIdentifier>() { + @Override + public DataIdentifier load(final String filename) throws Exception { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + OutputStream output = new DigestOutputStream(new NullOutputStream(), digest); + try { + IOUtils.copyLarge(new FileInputStream(filename), output); + } + finally { + output.close(); + } + return new DataIdentifier(encodeHexString(digest.digest())); + } + catch (NoSuchAlgorithmException e) { + LOG.warn("Unable to load digest algorithm \"SHA-1\""); + } + return null; + } + }); + @Override protected Backend createBackend() { S3Backend backend = new S3Backend(); @@ -47,4 +94,22 @@ public class S3DataStore extends CachingDataStore { public void setProperties(Properties properties) { this.properties = properties; } + + public boolean getRecordIfStored(final String filename) { + try { + final DataIdentifier identifier = identifierCache.get(filename); + if (null != identifier) { + return this.getBackend().exists(identifier); + } else { + LOG.warn(String.format("Unable to obtain identifier for filename %s", filename)); + } + } + catch (ExecutionException e) { + LOG.warn(String.format("Exception caught checking for %s in pending uploads", filename), e); + } + catch (DataStoreException e) { + LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads", filename), e); + } + return false; + } } diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java new file mode 100644 index 0000000..5356607 --- /dev/null +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import com.google.common.base.Strings; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean; + +public class S3DataStoreStats extends AnnotatedStandardMBean implements S3DataStoreStatsMBean { + private final S3DataStore s3ds; + + public S3DataStoreStats(final S3DataStore s3ds) { + super(S3DataStoreStatsMBean.class); + this.s3ds = s3ds; + } + + @Override + public long getActiveSyncs() { + return s3ds.getPendingUploads().size(); + } + + @Override + public boolean isFileSynced(final String filename) { + if (Strings.isNullOrEmpty(filename)) { + return false; + } + return s3ds.getRecordIfStored(filename); + } +} diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java new file mode 100644 index 0000000..79bd14b --- /dev/null +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import org.apache.jackrabbit.core.data.DataIdentifier; + +public interface S3DataStoreStatsMBean { + String TYPE = "S3DataStoreStats"; + long getActiveSyncs(); + boolean isFileSynced(final String filename); +} diff --git a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java new file mode 100644 index 0000000..ed7d663 --- /dev/null +++ b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +import com.amazonaws.util.StringInputStream; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.jackrabbit.core.data.AsyncUploadCallback; +import org.apache.jackrabbit.core.data.Backend; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Constants; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.Utils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import javax.jcr.RepositoryException; +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.JMX; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerFactory; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import java.io.*; +import java.lang.management.ManagementFactory; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.util.Properties; +import java.util.Set; + +public class TestS3DataStoreStats { + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("target")); + + private static Properties properties; + private static MBeanServer jmxServer; + private static ObjectName mBeanName; + + private S3DataStoreStatsMBean mBean; + private File syncfile1; + + @BeforeClass + public static void preClass() throws IOException, RepositoryException, MalformedObjectNameException { + + // This will cause all tests in this file to be ignored if JMX properties + // are not passed into the test execution. + // + // If you want to run this unit test suite you will need to + // pass the following settings into the command-line. + // Example: + // -Djava.rmi.server.hostname=localhost + // -Dcom.sun.management.jmxremote.port=9999 + // -Dcom.sun.management.jmxremote.ssl=false + // -Dcom.sun.management.jmxremote.authenticate=false + for (final String property : Lists.newArrayList("java.rmi.server.hostname", + "com.sun.management.jmxremote.port", + "com.sun.management.jmxremote.ssl", + "com.sun.management.jmxremote.authenticate")) { + assumeFalse(Strings.isNullOrEmpty(System.getProperty(property))); + } + + // This will cause all tests in this file to be ignored if no JMX + // server could be found. + jmxServer = ManagementFactory.getPlatformMBeanServer(); + if (null == jmxServer) { + jmxServer = MBeanServerFactory.newMBeanServer(); + } + assumeNotNull(jmxServer); + + // This will cause all tests in this file to be ignored if no S3 + // configuration has been provided. + // + // If you want to run this unit test suite you will need to + // pass the following setting into the command-line. + // Example: + // -Dconfig=/path/to/aws/properties + // + // Properties file uses the same format as for S3DataStore configuration. + assumeFalse(Strings.isNullOrEmpty(System.getProperty("config"))); + + properties = Utils.readConfig(System.getProperty("config")); + + mBeanName = new ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats"); + } + + @Before + public void setup() throws IOException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + // Set up JMX connection and mbean + final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); + final JMXConnector connector = JMXConnectorFactory.connect(url, null); + final MBeanServerConnection connection = connector.getMBeanServerConnection(); + mBean = JMX.newMBeanProxy(connection, mBeanName, S3DataStoreStatsMBean.class, true); + + syncfile1 = new File(getClass().getResource("/syncfile1").getFile()); + } + + @After + public void teardown() throws InstanceNotFoundException, MBeanRegistrationException { + jmxServer.unregisterMBean(mBeanName); + } + + private S3Backend getMockS3Backend() throws DataStoreException { + S3Backend backend = mock(S3Backend.class, Mockito.CALLS_REAL_METHODS); + doNothing().when(backend).writeAsync(any(DataIdentifier.class), any(File.class), any(AsyncUploadCallback.class)); + doNothing().when(backend).write(any(DataIdentifier.class), any(File.class)); + return backend; + } + + private void setupTestS3DS(final S3DataStore s3ds) throws IOException, RepositoryException { + s3ds.setProperties(properties); + s3ds.setSecret((String) properties.get(S3Constants.SECRET_KEY)); + s3ds.init(folder.newFolder().getAbsolutePath()); + } + + private S3DataStore getDefaultS3DS() throws IOException, RepositoryException { + final S3DataStore s3ds = new S3DataStore(); + setupTestS3DS(s3ds); + return s3ds; + } + + private S3DataStore getCustomBackendS3DS(final S3Backend backend) throws IOException, RepositoryException { + final S3DataStore s3ds = new CustomBackendS3DataStore(backend); + setupTestS3DS(s3ds); + return s3ds; + } + + @Test + public void testGetActiveS3FileSyncMetricExists() throws RepositoryException, IOException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testGetSingleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + DataRecord record = null; + try { + record = s3ds.addRecord(new StringInputStream("test")); + assert(1 == mBean.getActiveSyncs()); + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + } + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + final Set<DataRecord> records = Sets.newHashSet(); + try { + records.add(s3ds.addRecord(new StringInputStream("test1"))); + records.add(s3ds.addRecord(new StringInputStream("test2"))); + records.add(s3ds.addRecord(new StringInputStream("test3"))); + + assert (3 == mBean.getActiveSyncs()); + } + finally { + for (final DataRecord record : records) { + s3ds.deleteRecord(record.getIdentifier()); + } + } + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testIsFileSyncedMetricExists() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath())); + } + + @Test + public void testIsFileSyncedNullFileReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(null)); + } + + @Test + public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced("")); + } + + @Test + public void testIsFileSyncedInvalidFilenameReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced("invalid")); + } + + @Test + public void testIsFileSyncedFileNotAddedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath())); + } + + @Test + public void testIsFileSyncedSyncIncompleteReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath())); + + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + } + + @Test + public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + int tries = 0; + while (stats.getActiveSyncs() > 0 && 50 > tries++) { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { } + } + + assert(mBean.isFileSynced(syncfile1.getAbsolutePath())); + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + } + + @Test + public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + int tries = 0; + while (stats.getActiveSyncs() > 0 && 50 > tries++) { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { } + } + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + + assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath())); + } + + private class CustomBackendS3DataStore extends S3DataStore { + private S3Backend _localBackend; + CustomBackendS3DataStore(final S3Backend backend) { _localBackend = backend; } + @Override + protected Backend createBackend() { + if(properties != null){ + _localBackend.setProperties(properties); + } + return _localBackend; + } + } +} diff --git a/oak-blob-cloud/src/test/resources/syncfile1 b/oak-blob-cloud/src/test/resources/syncfile1 new file mode 100644 index 0000000..aa9d41a --- /dev/null +++ b/oak-blob-cloud/src/test/resources/syncfile1 @@ -0,0 +1,2 @@ +This is syncfile1. +It is used by org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.TestS3DataStoreStats.java. diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java index 0c2035f..e14afc8 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java @@ -60,6 +60,9 @@ import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.ObserverTracker; @@ -71,6 +74,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats; import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats; @@ -787,6 +791,19 @@ public class DocumentNodeStoreService { BlobGCMBean.TYPE, "Document node store blob garbage collection")); } + if (null != store.getBlobStore() && store.getBlobStore() instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)store.getBlobStore(); + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds); + registrations.add(registerMBean(whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName())); + } + } + RevisionGC revisionGC = new RevisionGC(new Runnable() { @Override public void run() { diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml index bff7015..c1fa16f 100644 --- a/oak-segment-tar/pom.xml +++ b/oak-segment-tar/pom.xml @@ -103,6 +103,12 @@ </dependency> <dependency> <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-blob-cloud</artifactId> + <version>${oak.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.jackrabbit</groupId> <artifactId>oak-core</artifactId> <version>${oak.version}</version> <scope>provided</scope> diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java index 84c2521..aa9e956 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java @@ -56,6 +56,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.osgi.ObserverTracker; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; @@ -64,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; @@ -509,6 +513,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore ) ))); + // Expose statistics about S3DataStore, if one is being used + + if (null != blobStore && blobStore instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore; + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds); + registrations.add(registerMBean( + whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName() + )); + } + } + // Register a factory service to expose the FileStore providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null); diff --git a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java index 11abf49..7f998ee 100644 --- a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java +++ b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java @@ -59,6 +59,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.ObserverTracker; @@ -70,6 +73,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; @@ -289,6 +293,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore private Registration stringCacheMBean; private Registration fsgcMonitorMBean; private Registration fileStoreStatsMBean; + private Registration s3DataStoreStatsRegistration; private WhiteboardExecutor executor; private boolean customBlobStore; @@ -536,6 +541,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore scheduleWithFixedDelay(whiteboard, fsgcm, 1) ); + // Expose statistics about S3DataStore, if one is being used + + if (null != blobStore && blobStore instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore; + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds); + s3DataStoreStatsRegistration = registerMBean( + whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName() + ); + } + } + // Register a factory service to expose the FileStore providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null); @@ -714,6 +736,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore fileStoreStatsMBean.unregister(); fileStoreStatsMBean = null; } + if (s3DataStoreStatsRegistration != null) { + s3DataStoreStatsRegistration.unregister(); + s3DataStoreStatsRegistration = null; + } if (executor != null) { executor.stop(); executor = null;