Use this patch instead; updated patch from latest in trunk. On Fri, Aug 26, 2016 at 4:41 PM, Matt Ryan <o...@mvryan.org> wrote:
> Hi Oak Devs, > > I've created OAK-4712 and submitted a patch for the same. I've attached > the same patch to this email. > > The submission is to add a new MBean, S3DataStoreStats, which will allow > reporting via JMX about the state of the S3DataStore. Two metrics are > intended. The first is to report the number of files that are in the sync > state, meaning they have been added to the S3DataStore but are not yet > completely copied into S3. The second is to allow to query the sync state > of a file - a return value of true would mean that the file provided is > fully synchronized. This uses an external file name, not the internal ID. > > I have some questions about the implementation first: > 1. For the request to query the sync state of a file, should the file name > provided be a full path to a local file or a path to a file within OAK > (e.g. /content/dam/myimage.jpg)? Current implementation uses a local file > path but I have been wondering if it should be an OAK path. > 2. For the request to query the sync state of a file, when converting from > the externally-supplied file name to an internal DataIdentifier, this > implementation is performing the same calculation to determine the internal > ID name as is done when a file is stored. I have a number of concerns with > this: > - It is inefficient - the entire file has to be read and digested in > order to compute the internal ID. This takes a long time for large assets. > - I've essentially duplicated the logic from CachingDataStore into > S3DataStore to compute the internal ID. I hate duplicating the code, but I > am trying to avoid exposing internal IDs in API, and I am not seeing a good > way in the current implementation to avoid this without either modifying > public API to CachingDataStore, or exposing the internal ID via API, or > both. > > Any suggestions on these two issues? > > > I'm also experiencing a problem with this patch. In my testing it appears > to work fine, until I delete a file. For example, if I delete an asset via > the REST API, I will see the asset deleted in CRXDE. However, the file > still remains in S3. This MBean as implemented only knows how to check > with S3DataStore and the corresponding backend, and these all appear to > believe the file still exists. So the MBean continues to report that the > file's sync state is synchronized (i.e. isFileSynced() returns true) even > though the file has been removed from the JCR. Any suggestions on how to > resolve this? > > > Finally, any feedback on the patch is welcome. And if I did the process > wrong please correct me (gently) - first time submission here. Thanks > >
diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml index 800f716..ad3cb3c 100644 --- a/oak-blob-cloud/pom.xml +++ b/oak-blob-cloud/pom.xml @@ -41,7 +41,7 @@ <artifactId>maven-bundle-plugin</artifactId> <configuration> <instructions> - <Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3</Export-Package> + <Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats</Export-Package> <DynamicImport-Package>sun.io</DynamicImport-Package> </instructions> </configuration> @@ -101,6 +101,13 @@ <version>${jackrabbit.version}</version> </dependency> + <!-- Dependencies to other Oak components --> + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-commons</artifactId> + <version>${project.version}</version> + </dependency> + <!-- Amazon AWS dependency --> <dependency> <groupId>com.amazonaws</groupId> @@ -140,6 +147,12 @@ <artifactId>logback-classic</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java index fc21bf6..be2ed54 100644 --- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java @@ -16,17 +16,64 @@ */ package org.apache.jackrabbit.oak.blob.cloud.aws.s3; +import java.io.FileInputStream; +import java.io.OutputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.NullOutputStream; import org.apache.jackrabbit.core.data.Backend; import org.apache.jackrabbit.core.data.CachingDataStore; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An Amazon S3 data store. */ public class S3DataStore extends CachingDataStore { + + /** + * Logger instance. + */ + private static final Logger LOG = LoggerFactory.getLogger(S3DataStore.class); + protected Properties properties; + private final LoadingCache<String, DataIdentifier> identifierCache = + CacheBuilder.newBuilder() + .maximumSize(8192).expireAfterAccess(5, TimeUnit.MINUTES) + .build(new CacheLoader<String, DataIdentifier>() { + @Override + public DataIdentifier load(final String filename) throws Exception { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + OutputStream output = new DigestOutputStream(new NullOutputStream(), digest); + try { + IOUtils.copyLarge(new FileInputStream(filename), output); + } + finally { + output.close(); + } + return new DataIdentifier(encodeHexString(digest.digest())); + } + catch (NoSuchAlgorithmException e) { + LOG.warn("Unable to load digest algorithm \"SHA-1\""); + } + return null; + } + }); + @Override protected Backend createBackend() { S3Backend backend = new S3Backend(); @@ -47,4 +94,22 @@ public class S3DataStore extends CachingDataStore { public void setProperties(Properties properties) { this.properties = properties; } + + public boolean getRecordIfStored(final String filename) { + try { + final DataIdentifier identifier = identifierCache.get(filename); + if (null != identifier) { + return this.getBackend().exists(identifier); + } else { + LOG.warn(String.format("Unable to obtain identifier for filename %s", filename)); + } + } + catch (ExecutionException e) { + LOG.warn(String.format("Exception caught checking for %s in pending uploads", filename), e); + } + catch (DataStoreException e) { + LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads", filename), e); + } + return false; + } } diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java new file mode 100644 index 0000000..5356607 --- /dev/null +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import com.google.common.base.Strings; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean; + +public class S3DataStoreStats extends AnnotatedStandardMBean implements S3DataStoreStatsMBean { + private final S3DataStore s3ds; + + public S3DataStoreStats(final S3DataStore s3ds) { + super(S3DataStoreStatsMBean.class); + this.s3ds = s3ds; + } + + @Override + public long getActiveSyncs() { + return s3ds.getPendingUploads().size(); + } + + @Override + public boolean isFileSynced(final String filename) { + if (Strings.isNullOrEmpty(filename)) { + return false; + } + return s3ds.getRecordIfStored(filename); + } +} diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java new file mode 100644 index 0000000..79bd14b --- /dev/null +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import org.apache.jackrabbit.core.data.DataIdentifier; + +public interface S3DataStoreStatsMBean { + String TYPE = "S3DataStoreStats"; + long getActiveSyncs(); + boolean isFileSynced(final String filename); +} diff --git a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java new file mode 100644 index 0000000..ed7d663 --- /dev/null +++ b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +import com.amazonaws.util.StringInputStream; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.jackrabbit.core.data.AsyncUploadCallback; +import org.apache.jackrabbit.core.data.Backend; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Constants; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.Utils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import javax.jcr.RepositoryException; +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.JMX; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerFactory; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import java.io.*; +import java.lang.management.ManagementFactory; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.util.Properties; +import java.util.Set; + +public class TestS3DataStoreStats { + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("target")); + + private static Properties properties; + private static MBeanServer jmxServer; + private static ObjectName mBeanName; + + private S3DataStoreStatsMBean mBean; + private File syncfile1; + + @BeforeClass + public static void preClass() throws IOException, RepositoryException, MalformedObjectNameException { + + // This will cause all tests in this file to be ignored if JMX properties + // are not passed into the test execution. + // + // If you want to run this unit test suite you will need to + // pass the following settings into the command-line. + // Example: + // -Djava.rmi.server.hostname=localhost + // -Dcom.sun.management.jmxremote.port=9999 + // -Dcom.sun.management.jmxremote.ssl=false + // -Dcom.sun.management.jmxremote.authenticate=false + for (final String property : Lists.newArrayList("java.rmi.server.hostname", + "com.sun.management.jmxremote.port", + "com.sun.management.jmxremote.ssl", + "com.sun.management.jmxremote.authenticate")) { + assumeFalse(Strings.isNullOrEmpty(System.getProperty(property))); + } + + // This will cause all tests in this file to be ignored if no JMX + // server could be found. + jmxServer = ManagementFactory.getPlatformMBeanServer(); + if (null == jmxServer) { + jmxServer = MBeanServerFactory.newMBeanServer(); + } + assumeNotNull(jmxServer); + + // This will cause all tests in this file to be ignored if no S3 + // configuration has been provided. + // + // If you want to run this unit test suite you will need to + // pass the following setting into the command-line. + // Example: + // -Dconfig=/path/to/aws/properties + // + // Properties file uses the same format as for S3DataStore configuration. + assumeFalse(Strings.isNullOrEmpty(System.getProperty("config"))); + + properties = Utils.readConfig(System.getProperty("config")); + + mBeanName = new ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats"); + } + + @Before + public void setup() throws IOException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + // Set up JMX connection and mbean + final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); + final JMXConnector connector = JMXConnectorFactory.connect(url, null); + final MBeanServerConnection connection = connector.getMBeanServerConnection(); + mBean = JMX.newMBeanProxy(connection, mBeanName, S3DataStoreStatsMBean.class, true); + + syncfile1 = new File(getClass().getResource("/syncfile1").getFile()); + } + + @After + public void teardown() throws InstanceNotFoundException, MBeanRegistrationException { + jmxServer.unregisterMBean(mBeanName); + } + + private S3Backend getMockS3Backend() throws DataStoreException { + S3Backend backend = mock(S3Backend.class, Mockito.CALLS_REAL_METHODS); + doNothing().when(backend).writeAsync(any(DataIdentifier.class), any(File.class), any(AsyncUploadCallback.class)); + doNothing().when(backend).write(any(DataIdentifier.class), any(File.class)); + return backend; + } + + private void setupTestS3DS(final S3DataStore s3ds) throws IOException, RepositoryException { + s3ds.setProperties(properties); + s3ds.setSecret((String) properties.get(S3Constants.SECRET_KEY)); + s3ds.init(folder.newFolder().getAbsolutePath()); + } + + private S3DataStore getDefaultS3DS() throws IOException, RepositoryException { + final S3DataStore s3ds = new S3DataStore(); + setupTestS3DS(s3ds); + return s3ds; + } + + private S3DataStore getCustomBackendS3DS(final S3Backend backend) throws IOException, RepositoryException { + final S3DataStore s3ds = new CustomBackendS3DataStore(backend); + setupTestS3DS(s3ds); + return s3ds; + } + + @Test + public void testGetActiveS3FileSyncMetricExists() throws RepositoryException, IOException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testGetSingleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + DataRecord record = null; + try { + record = s3ds.addRecord(new StringInputStream("test")); + assert(1 == mBean.getActiveSyncs()); + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + } + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + final Set<DataRecord> records = Sets.newHashSet(); + try { + records.add(s3ds.addRecord(new StringInputStream("test1"))); + records.add(s3ds.addRecord(new StringInputStream("test2"))); + records.add(s3ds.addRecord(new StringInputStream("test3"))); + + assert (3 == mBean.getActiveSyncs()); + } + finally { + for (final DataRecord record : records) { + s3ds.deleteRecord(record.getIdentifier()); + } + } + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testIsFileSyncedMetricExists() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath())); + } + + @Test + public void testIsFileSyncedNullFileReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(null)); + } + + @Test + public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced("")); + } + + @Test + public void testIsFileSyncedInvalidFilenameReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced("invalid")); + } + + @Test + public void testIsFileSyncedFileNotAddedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath())); + } + + @Test + public void testIsFileSyncedSyncIncompleteReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath())); + + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + } + + @Test + public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + int tries = 0; + while (stats.getActiveSyncs() > 0 && 50 > tries++) { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { } + } + + assert(mBean.isFileSynced(syncfile1.getAbsolutePath())); + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + } + + @Test + public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + int tries = 0; + while (stats.getActiveSyncs() > 0 && 50 > tries++) { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { } + } + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + + assertFalse(mBean.isFileSynced(syncfile1.getAbsolutePath())); + } + + private class CustomBackendS3DataStore extends S3DataStore { + private S3Backend _localBackend; + CustomBackendS3DataStore(final S3Backend backend) { _localBackend = backend; } + @Override + protected Backend createBackend() { + if(properties != null){ + _localBackend.setProperties(properties); + } + return _localBackend; + } + } +} diff --git a/oak-blob-cloud/src/test/resources/syncfile1 b/oak-blob-cloud/src/test/resources/syncfile1 new file mode 100644 index 0000000..aa9d41a --- /dev/null +++ b/oak-blob-cloud/src/test/resources/syncfile1 @@ -0,0 +1,2 @@ +This is syncfile1. +It is used by org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.TestS3DataStoreStats.java. diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java index 0c2035f..e14afc8 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java @@ -60,6 +60,9 @@ import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.ObserverTracker; @@ -71,6 +74,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats; import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats; @@ -787,6 +791,19 @@ public class DocumentNodeStoreService { BlobGCMBean.TYPE, "Document node store blob garbage collection")); } + if (null != store.getBlobStore() && store.getBlobStore() instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)store.getBlobStore(); + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds); + registrations.add(registerMBean(whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName())); + } + } + RevisionGC revisionGC = new RevisionGC(new Runnable() { @Override public void run() { diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml index 617462e..acd2f08 100644 --- a/oak-segment-tar/pom.xml +++ b/oak-segment-tar/pom.xml @@ -177,6 +177,12 @@ </dependency> <dependency> <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-blob-cloud</artifactId> + <version>${oak.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.jackrabbit</groupId> <artifactId>oak-core</artifactId> <version>${oak.version}</version> <scope>provided</scope> diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java index 84c2521..aa9e956 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java @@ -56,6 +56,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.osgi.ObserverTracker; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; @@ -64,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; @@ -509,6 +513,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore ) ))); + // Expose statistics about S3DataStore, if one is being used + + if (null != blobStore && blobStore instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore; + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds); + registrations.add(registerMBean( + whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName() + )); + } + } + // Register a factory service to expose the FileStore providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null); diff --git a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java index 11abf49..7f998ee 100644 --- a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java +++ b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java @@ -59,6 +59,9 @@ import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.ObserverTracker; @@ -70,6 +73,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; @@ -289,6 +293,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore private Registration stringCacheMBean; private Registration fsgcMonitorMBean; private Registration fileStoreStatsMBean; + private Registration s3DataStoreStatsRegistration; private WhiteboardExecutor executor; private boolean customBlobStore; @@ -536,6 +541,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore scheduleWithFixedDelay(whiteboard, fsgcm, 1) ); + // Expose statistics about S3DataStore, if one is being used + + if (null != blobStore && blobStore instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore; + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds); + s3DataStoreStatsRegistration = registerMBean( + whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName() + ); + } + } + // Register a factory service to expose the FileStore providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null); @@ -714,6 +736,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore fileStoreStatsMBean.unregister(); fileStoreStatsMBean = null; } + if (s3DataStoreStatsRegistration != null) { + s3DataStoreStatsRegistration.unregister(); + s3DataStoreStatsRegistration = null; + } if (executor != null) { executor.stop(); executor = null;