This patch I believe addresses the issues identified with the previous patch. I've also uploaded it to OAK-4712.
Looking for review and feedback. This patch introduces a new interface I've named NodeIdMapper.java. Not sure where to put this. I've added it in order to be able to obtain the blob ID for a named node within the context of a NodeStoreService class by using an anonymous class. However, most of the functionality in each case (DocumentNodeStoreService and SegmentNodeStoreService) is similar. This could probably be addressed with a lot less code duplication by using an abstract base class, but since I'm not sure where would be best to put such a class. Ideas? I also want to call attention to how the blob ID is being used in these anonymous classes in the *NodeStoreService classes. The IDs I was getting back had trailing information with a '#' followed by some number of digits. I just split the string and discarded the last part. Is there a better way to do this? On Sun, Aug 28, 2016 at 10:04 PM, Amit Jain <am...@ieee.org> wrote: > Hi Matt, > > I have directly replied to your comments on the jira. > > Thanks > Amit > > On Sat, Aug 27, 2016 at 4:22 AM, Matt Ryan <o...@mvryan.org> wrote: > > > Use this patch instead; updated patch from latest in trunk. > > > > On Fri, Aug 26, 2016 at 4:41 PM, Matt Ryan <o...@mvryan.org> wrote: > > > >> Hi Oak Devs, > >> > >> I've created OAK-4712 and submitted a patch for the same. I've attached > >> the same patch to this email. > >> > >> The submission is to add a new MBean, S3DataStoreStats, which will allow > >> reporting via JMX about the state of the S3DataStore. Two metrics are > >> intended. The first is to report the number of files that are in the > sync > >> state, meaning they have been added to the S3DataStore but are not yet > >> completely copied into S3. The second is to allow to query the sync > state > >> of a file - a return value of true would mean that the file provided is > >> fully synchronized. This uses an external file name, not the internal > ID. > >> > >> I have some questions about the implementation first: > >> 1. For the request to query the sync state of a file, should the file > >> name provided be a full path to a local file or a path to a file within > OAK > >> (e.g. /content/dam/myimage.jpg)? Current implementation uses a local > file > >> path but I have been wondering if it should be an OAK path. > >> 2. For the request to query the sync state of a file, when converting > >> from the externally-supplied file name to an internal DataIdentifier, > this > >> implementation is performing the same calculation to determine the > internal > >> ID name as is done when a file is stored. I have a number of concerns > with > >> this: > >> - It is inefficient - the entire file has to be read and digested in > >> order to compute the internal ID. This takes a long time for large > assets. > >> - I've essentially duplicated the logic from CachingDataStore into > >> S3DataStore to compute the internal ID. I hate duplicating the code, > but I > >> am trying to avoid exposing internal IDs in API, and I am not seeing a > good > >> way in the current implementation to avoid this without either modifying > >> public API to CachingDataStore, or exposing the internal ID via API, or > >> both. > >> > >> Any suggestions on these two issues? > >> > >> > >> I'm also experiencing a problem with this patch. In my testing it > >> appears to work fine, until I delete a file. For example, if I delete > an > >> asset via the REST API, I will see the asset deleted in CRXDE. However, > >> the file still remains in S3. This MBean as implemented only knows how > to > >> check with S3DataStore and the corresponding backend, and these all > appear > >> to believe the file still exists. So the MBean continues to report that > >> the file's sync state is synchronized (i.e. isFileSynced() returns true) > >> even though the file has been removed from the JCR. Any suggestions on > how > >> to resolve this? > >> > >> > >> Finally, any feedback on the patch is welcome. And if I did the process > >> wrong please correct me (gently) - first time submission here. Thanks > >> > >> > > >
diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml index 800f716..ad3cb3c 100644 --- a/oak-blob-cloud/pom.xml +++ b/oak-blob-cloud/pom.xml @@ -41,7 +41,7 @@ <artifactId>maven-bundle-plugin</artifactId> <configuration> <instructions> - <Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3</Export-Package> + <Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats</Export-Package> <DynamicImport-Package>sun.io</DynamicImport-Package> </instructions> </configuration> @@ -101,6 +101,13 @@ <version>${jackrabbit.version}</version> </dependency> + <!-- Dependencies to other Oak components --> + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-commons</artifactId> + <version>${project.version}</version> + </dependency> + <!-- Amazon AWS dependency --> <dependency> <groupId>com.amazonaws</groupId> @@ -140,6 +147,12 @@ <artifactId>logback-classic</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/NodeIdMapper.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/NodeIdMapper.java new file mode 100644 index 0000000..d424169 --- /dev/null +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/NodeIdMapper.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3; + +public interface NodeIdMapper { + String getBlobIdForName(final String nodeName); +} diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java index fc21bf6..8305172 100644 --- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java @@ -17,14 +17,25 @@ package org.apache.jackrabbit.oak.blob.cloud.aws.s3; import java.util.Properties; + import org.apache.jackrabbit.core.data.Backend; import org.apache.jackrabbit.core.data.CachingDataStore; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An Amazon S3 data store. */ public class S3DataStore extends CachingDataStore { + + /** + * Logger instance. + */ + private static final Logger LOG = LoggerFactory.getLogger(S3DataStore.class); + protected Properties properties; @Override @@ -47,4 +58,19 @@ public class S3DataStore extends CachingDataStore { public void setProperties(Properties properties) { this.properties = properties; } + + public boolean getRecordIfStored(final String name, final NodeIdMapper mapper) { + try { + final String identifier = mapper.getBlobIdForName(name); + if (null != identifier) { + return this.getBackend().exists(new DataIdentifier(identifier)); + } else { + LOG.info(String.format("Unable to obtain identifier for filename %s", name)); + } + } + catch (DataStoreException e) { + LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads", name), e); + } + return false; + } } diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java new file mode 100644 index 0000000..03d04fb --- /dev/null +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStats.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import com.google.common.base.Strings; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.NodeIdMapper; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean; + +public class S3DataStoreStats extends AnnotatedStandardMBean implements S3DataStoreStatsMBean { + private final S3DataStore s3ds; + private final NodeIdMapper mapper; + + public S3DataStoreStats(final S3DataStore s3ds, final NodeIdMapper mapper) { + super(S3DataStoreStatsMBean.class); + this.s3ds = s3ds; + this.mapper = mapper; + } + + @Override + public long getActiveSyncs() { + return s3ds.getPendingUploads().size(); + } + + @Override + public boolean isFileSynced(final String filename) { + if (Strings.isNullOrEmpty(filename)) { + return false; + } + return s3ds.getRecordIfStored(filename, mapper); + } +} diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java new file mode 100644 index 0000000..79bd14b --- /dev/null +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import org.apache.jackrabbit.core.data.DataIdentifier; + +public interface S3DataStoreStatsMBean { + String TYPE = "S3DataStoreStats"; + long getActiveSyncs(); + boolean isFileSynced(final String filename); +} diff --git a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java new file mode 100644 index 0000000..efe05d3 --- /dev/null +++ b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/TestS3DataStoreStats.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +import static org.apache.commons.codec.binary.Hex.encodeHexString; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +import com.amazonaws.util.StringInputStream; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.NullOutputStream; +import org.apache.jackrabbit.core.data.AsyncUploadCallback; +import org.apache.jackrabbit.core.data.Backend; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.*; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import javax.jcr.RepositoryException; +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.JMX; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerFactory; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import java.io.*; +import java.lang.management.ManagementFactory; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +public class TestS3DataStoreStats { + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("target")); + + private static Properties properties; + private static MBeanServer jmxServer; + private static ObjectName mBeanName; + private static File syncfile1; + private static NodeIdMapper defaultMapper; + + private S3DataStoreStatsMBean mBean; + + @BeforeClass + public static void preClass() throws IOException, RepositoryException, MalformedObjectNameException, + NoSuchAlgorithmException + { + + // This will cause all tests in this file to be ignored if JMX properties + // are not passed into the test execution. + // + // If you want to run this unit test suite you will need to + // pass the following settings into the command-line. + // Example: + // -Djava.rmi.server.hostname=localhost + // -Dcom.sun.management.jmxremote.port=9999 + // -Dcom.sun.management.jmxremote.ssl=false + // -Dcom.sun.management.jmxremote.authenticate=false + for (final String property : Lists.newArrayList("java.rmi.server.hostname", + "com.sun.management.jmxremote.port", + "com.sun.management.jmxremote.ssl", + "com.sun.management.jmxremote.authenticate")) { + assumeFalse(Strings.isNullOrEmpty(System.getProperty(property))); + } + + // This will cause all tests in this file to be ignored if no JMX + // server could be found. + jmxServer = ManagementFactory.getPlatformMBeanServer(); + if (null == jmxServer) { + jmxServer = MBeanServerFactory.newMBeanServer(); + } + assumeNotNull(jmxServer); + + // This will cause all tests in this file to be ignored if no S3 + // configuration has been provided. + // + // If you want to run this unit test suite you will need to + // pass the following setting into the command-line. + // Example: + // -Dconfig=/path/to/aws/properties + // + // Properties file uses the same format as for S3DataStore configuration. + assumeFalse(Strings.isNullOrEmpty(System.getProperty("config"))); + + properties = Utils.readConfig(System.getProperty("config")); + + mBeanName = new ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats"); + + syncfile1 = new File(TestS3DataStoreStats.class.getResource("/syncfile1").getFile()); + + final char[] HEX = "0123456789abcdef".toCharArray(); + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + OutputStream out = new DigestOutputStream(new NullOutputStream(), digest); + IOUtils.copyLarge(new FileInputStream(syncfile1), out); + out.close(); + byte[] digestBytes = digest.digest(); + char[] buffer = new char[digestBytes.length * 2]; + for (int i=0; i<digestBytes.length; i++) { + buffer[2*i] = HEX[(digestBytes[i] >> 4) & 0x0f]; + buffer[2*i+1] = HEX[digestBytes[i] & 0x0f]; + } + final String syncFileBlobId = new String(buffer); + + defaultMapper = new NodeIdMapper() { + @Override + public String getBlobIdForName(final String nodeName) { + return nodeName.equals(syncfile1.getName()) ? syncFileBlobId : null; + } + }; + } + + @Before + public void setup() throws IOException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + // Set up JMX connection and mbean + final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); + final JMXConnector connector = JMXConnectorFactory.connect(url, null); + final MBeanServerConnection connection = connector.getMBeanServerConnection(); + mBean = JMX.newMBeanProxy(connection, mBeanName, S3DataStoreStatsMBean.class, true); + } + + @After + public void teardown() throws InstanceNotFoundException, MBeanRegistrationException { + jmxServer.unregisterMBean(mBeanName); + } + + private S3Backend getMockS3Backend() throws DataStoreException { + S3Backend backend = mock(S3Backend.class, Mockito.CALLS_REAL_METHODS); + doNothing().when(backend).writeAsync(any(DataIdentifier.class), any(File.class), any(AsyncUploadCallback.class)); + doNothing().when(backend).write(any(DataIdentifier.class), any(File.class)); + return backend; + } + + private void setupTestS3DS(final S3DataStore s3ds) throws IOException, RepositoryException { + s3ds.setProperties(properties); + s3ds.setSecret((String) properties.get(S3Constants.SECRET_KEY)); + s3ds.init(folder.newFolder().getAbsolutePath()); + } + + private S3DataStore getDefaultS3DS() throws IOException, RepositoryException { + final S3DataStore s3ds = new S3DataStore(); + setupTestS3DS(s3ds); + return s3ds; + } + + private S3DataStore getCustomBackendS3DS(final S3Backend backend) throws IOException, RepositoryException { + final S3DataStore s3ds = new CustomBackendS3DataStore(backend); + setupTestS3DS(s3ds); + return s3ds; + } + + @Test + public void testGetActiveS3FileSyncMetricExists() throws RepositoryException, IOException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testGetSingleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + DataRecord record = null; + try { + record = s3ds.addRecord(new StringInputStream("test")); + assert(1 == mBean.getActiveSyncs()); + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + } + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + final Set<DataRecord> records = Sets.newHashSet(); + try { + records.add(s3ds.addRecord(new StringInputStream("test1"))); + records.add(s3ds.addRecord(new StringInputStream("test2"))); + records.add(s3ds.addRecord(new StringInputStream("test3"))); + + assert (3 == mBean.getActiveSyncs()); + } + finally { + for (final DataRecord record : records) { + s3ds.deleteRecord(record.getIdentifier()); + } + } + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testIsFileSyncedMetricExists() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(syncfile1.getName())); + } + + @Test + public void testIsFileSyncedNullFileReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(null)); + } + + @Test + public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced("")); + } + + @Test + public void testIsFileSyncedInvalidFilenameReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced("invalid")); + } + + @Test + public void testIsFileSyncedFileNotAddedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + assertFalse(mBean.isFileSynced(syncfile1.getName())); + } + + @Test + public void testIsFileSyncedSyncIncompleteReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3Backend backend = getMockS3Backend(); + final S3DataStore s3ds = getCustomBackendS3DS(backend); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + assertFalse(mBean.isFileSynced(syncfile1.getName())); + + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + } + + @Test + public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + int tries = 0; + while (stats.getActiveSyncs() > 0 && 50 > tries++) { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { } + } + + assert(mBean.isFileSynced(syncfile1.getName())); + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + } + + @Test + public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + final S3DataStore s3ds = getDefaultS3DS(); + final S3DataStoreStats stats = new S3DataStoreStats(s3ds, defaultMapper); + jmxServer.registerMBean(stats, mBeanName); + + FileInputStream inputStream = null; + DataRecord record = null; + try { + inputStream = new FileInputStream(syncfile1); + + record = s3ds.addRecord(new FileInputStream(syncfile1)); + + int tries = 0; + while (stats.getActiveSyncs() > 0 && 50 > tries++) { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { } + } + } + finally { + if (null != record) { + s3ds.deleteRecord(record.getIdentifier()); + } + if (null != inputStream) { + inputStream.close(); + } + } + + assertFalse(mBean.isFileSynced(syncfile1.getName())); + } + + private class CustomBackendS3DataStore extends S3DataStore { + private S3Backend _localBackend; + CustomBackendS3DataStore(final S3Backend backend) { _localBackend = backend; } + @Override + protected Backend createBackend() { + if(properties != null){ + _localBackend.setProperties(properties); + } + return _localBackend; + } + } +} diff --git a/oak-blob-cloud/src/test/resources/syncfile1 b/oak-blob-cloud/src/test/resources/syncfile1 new file mode 100644 index 0000000..aa9d41a --- /dev/null +++ b/oak-blob-cloud/src/test/resources/syncfile1 @@ -0,0 +1,2 @@ +This is syncfile1. +It is used by org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.TestS3DataStoreStats.java. diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java index 0c2035f..31b8b8e 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java @@ -31,6 +31,7 @@ import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -44,6 +45,7 @@ import java.util.concurrent.TimeUnit; import javax.sql.DataSource; +import com.google.common.collect.Lists; import com.mongodb.MongoClientURI; import org.apache.felix.scr.annotations.Activate; @@ -57,9 +59,15 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.NodeIdMapper; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.ObserverTracker; @@ -67,10 +75,12 @@ import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; import org.apache.jackrabbit.oak.plugins.blob.BlobGC; import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; +import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats; import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats; @@ -81,6 +91,7 @@ import org.apache.jackrabbit.oak.spi.blob.BlobStoreWrapper; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.blob.stats.BlobStoreStatsMBean; import org.apache.jackrabbit.oak.spi.state.Clusterable; +import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.spi.state.RevisionGC; import org.apache.jackrabbit.oak.spi.state.RevisionGCMBean; @@ -787,6 +798,56 @@ public class DocumentNodeStoreService { BlobGCMBean.TYPE, "Document node store blob garbage collection")); } + if (null != store.getBlobStore() && store.getBlobStore() instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)store.getBlobStore(); + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, new NodeIdMapper() { + @Override + public String getBlobIdForName(String nodeName) { + if (null != nodeStore) { + final List<String> allNodes = Lists.newArrayList(nodeName.split(File.separator)); + final List<String> pathNodes = allNodes.subList(0, allNodes.size()-1); + final String leafNodeName = allNodes.get(allNodes.size()-1); + + NodeState currentNode = nodeStore.getRoot(); + for (final String pathNodeName : pathNodes) { + if (pathNodeName.length() > 0) { + final NodeState childNode = currentNode.getChildNode(pathNodeName); + if (!childNode.exists()) { + break; + } + currentNode = childNode; + } + } + final NodeState leafNode = currentNode.getChildNode(leafNodeName); + if (leafNode.exists()) { + final PropertyState state = leafNode.getProperty("jcr:data"); + if (null != state && state.getType() == Type.BINARY) { + final BlobStoreBlob blob = (BlobStoreBlob) state.getValue(state.getType()); + if (null != blob) { + final String blobId = blob.getBlobId(); + if (null != blobId) { + // Blob Ids (or at least some) have a # followed by + // addtional characters we don't need, so let's + // only take the part we need for finding the file. + return blobId.split("#")[0]; + } + } + } + } + } + return null; + } + }); + registrations.add(registerMBean(whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName())); + } + } + RevisionGC revisionGC = new RevisionGC(new Runnable() { @Override public void run() { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java index 84c2521..c70f4cf 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java @@ -44,6 +44,7 @@ import java.util.Hashtable; import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.ConfigurationPolicy; @@ -54,8 +55,14 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.NodeIdMapper; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.osgi.ObserverTracker; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; @@ -64,6 +71,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; @@ -82,6 +90,7 @@ import org.apache.jackrabbit.oak.spi.commit.Observable; import org.apache.jackrabbit.oak.spi.commit.Observer; import org.apache.jackrabbit.oak.spi.gc.GCMonitor; import org.apache.jackrabbit.oak.spi.gc.GCMonitorTracker; +import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore; import org.apache.jackrabbit.oak.spi.state.RevisionGC; @@ -596,6 +605,60 @@ public class SegmentNodeStoreService extends ProxyNodeStore )); } + // Expose statistics about S3DataStore, if one is being used + + if (null != blobStore && blobStore instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore; + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, new NodeIdMapper() { + @Override + public String getBlobIdForName(String nodeName) { + if (null != segmentNodeStore) { + final List<String> allNodes = Lists.newArrayList(nodeName.split(File.separator)); + final List<String> pathNodes = allNodes.subList(0, allNodes.size()-1); + final String leafNodeName = allNodes.get(allNodes.size()-1); + + NodeState currentNode = segmentNodeStore.getRoot(); + for (final String pathNodeName : pathNodes) { + if (pathNodeName.length() > 0) { + final NodeState childNode = currentNode.getChildNode(pathNodeName); + if (!childNode.exists()) { + break; + } + currentNode = childNode; + } + } + final NodeState leafNode = currentNode.getChildNode(leafNodeName); + if (leafNode.exists()) { + final PropertyState state = leafNode.getProperty("jcr:data"); + if (null != state && state.getType() == Type.BINARY) { + final SegmentBlob blob = (SegmentBlob) state.getValue(state.getType()); + if (null != blob) { + final String blobId = blob.getBlobId(); + if (null != blobId) { + // Blob Ids (or at least some) have a # followed by + // addtional characters we don't need, so let's + // only take the part we need for finding the file. + return blobId.split("#")[0]; + } + } + } + } + } + return null; + } + }); + registrations.add(registerMBean( + whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName() + )); + } + } + log.info("SegmentNodeStore initialized"); return true; } diff --git a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java index 11abf49..0f9150b 100644 --- a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java +++ b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java @@ -43,9 +43,11 @@ import java.io.IOException; import java.util.Collections; import java.util.Dictionary; import java.util.Hashtable; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.ConfigurationPolicy; @@ -57,8 +59,14 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.NodeIdMapper; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStats; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.ObserverTracker; @@ -70,6 +78,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; @@ -89,6 +98,7 @@ import org.apache.jackrabbit.oak.spi.commit.Observable; import org.apache.jackrabbit.oak.spi.commit.Observer; import org.apache.jackrabbit.oak.spi.gc.GCMonitor; import org.apache.jackrabbit.oak.spi.gc.GCMonitorTracker; +import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider; import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore; @@ -289,6 +299,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore private Registration stringCacheMBean; private Registration fsgcMonitorMBean; private Registration fileStoreStatsMBean; + private Registration s3DataStoreStatsRegistration; private WhiteboardExecutor executor; private boolean customBlobStore; @@ -536,6 +547,60 @@ public class SegmentNodeStoreService extends ProxyNodeStore scheduleWithFixedDelay(whiteboard, fsgcm, 1) ); + // Expose statistics about S3DataStore, if one is being used + + if (null != blobStore && blobStore instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore; + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, new NodeIdMapper() { + @Override + public String getBlobIdForName(String nodeName) { + if (null != segmentNodeStore) { + final List<String> allNodes = Lists.newArrayList(nodeName.split(File.separator)); + final List<String> pathNodes = allNodes.subList(0, allNodes.size()-1); + final String leafNodeName = allNodes.get(allNodes.size()-1); + + NodeState currentNode = segmentNodeStore.getRoot(); + for (final String pathNodeName : pathNodes) { + if (pathNodeName.length() > 0) { + final NodeState childNode = currentNode.getChildNode(pathNodeName); + if (!childNode.exists()) { + break; + } + currentNode = childNode; + } + } + final NodeState leafNode = currentNode.getChildNode(leafNodeName); + if (leafNode.exists()) { + final PropertyState state = leafNode.getProperty("jcr:data"); + if (null != state && state.getType() == Type.BINARY) { + final SegmentBlob blob = (SegmentBlob) state.getValue(state.getType()); + if (null != blob) { + final String blobId = blob.getBlobId(); + if (null != blobId) { + // Blob Ids (or at least some) have a # followed by + // addtional characters we don't need, so let's + // only take the part we need for finding the file. + return blobId.split("#")[0]; + } + } + } + } + } + return null; + } + }); + s3DataStoreStatsRegistration = registerMBean( + whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName() + ); + } + } + // Register a factory service to expose the FileStore providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null); @@ -714,6 +779,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore fileStoreStatsMBean.unregister(); fileStoreStatsMBean = null; } + if (s3DataStoreStatsRegistration != null) { + s3DataStoreStatsRegistration.unregister(); + s3DataStoreStatsRegistration = null; + } if (executor != null) { executor.stop(); executor = null;