Chad has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/158279

Change subject: WIP: Rewrite BlobStore support
......................................................................

WIP: Rewrite BlobStore support

Upstream is breaking everything. So we'll have to too.

Change-Id: Ic159ca0c658a79f0fcc60fd58f8763cc149d271e
---
M pom.xml
M 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
M 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
D 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/AbstractSwiftBlobContainer.java
A 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
M 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
D 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
7 files changed, 155 insertions(+), 248 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/repository-swift 
refs/changes/79/158279/1

diff --git a/pom.xml b/pom.xml
index 4ced71e..e547bd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <elasticsearch.version>1.3.2</elasticsearch.version>
+        <elasticsearch.version>2.0.0-SNAPSHOT</elasticsearch.version>
     </properties>
 
     <dependencies>
@@ -49,7 +49,7 @@
         <dependency>
             <groupId>org.javaswift</groupId>
             <artifactId>joss</artifactId>
-            <version>0.9.8</version>
+            <version>0.9.9-SNAPSHOT</version>
         </dependency>
         <!-- testing only -->
         <dependency>
@@ -113,11 +113,6 @@
                     <source>1.7</source>
                     <target>1.7</target>
                 </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-clean-plugin</artifactId>
-                <version>2.4.1</version>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
index 8e04759..691a3b7 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
@@ -1,14 +1,10 @@
 package org.wikimedia.elasticsearch.swift.repositories;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.snapshots.IndexShardRepository;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryName;
@@ -66,14 +62,9 @@
                String password = 
repositorySettings.settings().get("swift_password", "");
                String tenantName = 
repositorySettings.settings().get("swift_tenantname", "");
                String authMethod = 
repositorySettings.settings().get("swift_authmethod", "");
-
-               int concurrentStreams = 
repositorySettings.settings().getAsInt("concurrent_streams", 
componentSettings.getAsInt("concurrent_streams", 5));
-               ExecutorService concurrentStreamPool = EsExecutors.newScaling(
-                               1, concurrentStreams, 5, TimeUnit.SECONDS, 
EsExecutors.daemonThreadFactory(settings, "[swift_stream]"));
-
                Account account = 
SwiftAccountFactory.createAccount(swiftService, url, username, password, 
tenantName, authMethod);
 
-               blobStore = new SwiftBlobStore(settings, account, container, 
concurrentStreamPool);
+               blobStore = new SwiftBlobStore(settings, account, container);
 
                this.chunkSize = 
repositorySettings.settings().getAsBytesSize("chunk_size",
                                componentSettings.getAsBytesSize("chunk_size", 
new ByteSizeValue(5, ByteSizeUnit.GB)));
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
index dbb9e43..f382f4a 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
@@ -9,13 +9,6 @@
  * Swift repository module. Binds us to things.
  */
 public class SwiftRepositoryModule extends AbstractModule {
-       /**
-        * Constructor. Super boring.
-        */
-    public SwiftRepositoryModule() {
-        super();
-    }
-
     /**
      * Do the binding.
      */
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/AbstractSwiftBlobContainer.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/AbstractSwiftBlobContainer.java
deleted file mode 100644
index 5db6376..0000000
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/AbstractSwiftBlobContainer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package org.wikimedia.elasticsearch.swift.repositories.blobstore;
-
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.blobstore.BlobMetaData;
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
-import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
-import org.elasticsearch.common.collect.ImmutableMap;
-import org.javaswift.joss.model.Directory;
-import org.javaswift.joss.model.DirectoryOrObject;
-import org.javaswift.joss.model.StoredObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collection;
-
-/**
- * Swift's implementation of the AbstractBlobContainer
- */
-public class AbstractSwiftBlobContainer extends AbstractBlobContainer {
-       // Our local swift blob store instance
-    protected final SwiftBlobStore blobStore;
-
-    // The root path for blobs. Used by buildKey to build full blob names
-    protected final String keyPath;
-
-    /**
-     * Constructor
-     * @param path The BlobPath to find blobs in
-     * @param blobStore The blob store to use for operations
-     */
-    protected AbstractSwiftBlobContainer(BlobPath path, SwiftBlobStore 
blobStore) {
-        super(path);
-        this.blobStore = blobStore;
-        String keyPath = path.buildAsString("/");
-        if (!keyPath.isEmpty()) {
-            keyPath = keyPath + "/";
-        }
-        this.keyPath = keyPath;
-    }
-
-    /**
-     * Does a blob exist? Self-explanatory.
-     */
-    @Override
-    public boolean blobExists(String blobName) {
-       return blobStore.swift().getObject(buildKey(blobName)).exists();
-    }
-
-    /**
-     * Read a given blob into the listener
-     * @param blobName The blob name to read
-     * @param listener The listener to report our read info back to
-     */
-    @Override
-    public void readBlob(final String blobName, final ReadBlobListener 
listener) {
-        blobStore.executor().execute(new Runnable() {
-            @Override
-            public void run() {
-                InputStream is;
-                try {
-                       // This is the interesting bit. Fetch the blob and then 
turn it
-                       // into an InputStream for reading below
-                    StoredObject object = 
blobStore.swift().getObject(buildKey(blobName));
-                    is = object.downloadObjectAsInputStream();
-                } catch (Exception e) {
-                    listener.onFailure(e);
-                    return;
-                }
-                byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
-                try {
-                    int bytesRead;
-                    while ((bytesRead = is.read(buffer)) != -1) {
-                        listener.onPartial(buffer, 0, bytesRead);
-                    }
-                    listener.onCompleted();
-                } catch (Exception e) {
-                    try {
-                        is.close();
-                    } catch (IOException e1) {
-                        // ignore
-                    }
-                    listener.onFailure(e);
-                }
-            }
-        });
-    }
-
-    /**
-     * Delete a blob. Straightforward.
-     * @param blobName A blob to delete
-     */
-    @Override
-    public boolean deleteBlob(String blobName) throws IOException {
-       StoredObject object = blobStore.swift().getObject(buildKey(blobName));
-       if (object.exists()) {
-               object.delete();
-       }
-        return true;
-    }
-
-    /**
-     * Get the blobs matching a given prefix
-     * @param blobNamePrefix The prefix to look for blobs with
-     */
-    @Override
-    public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable 
String blobNamePrefix) throws IOException {
-        ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = 
ImmutableMap.builder();
-
-        Collection<DirectoryOrObject> files;
-        if (blobNamePrefix != null) {
-            files = blobStore.swift().listDirectory(new 
Directory(buildKey(blobNamePrefix), '/'));
-        } else {
-            files = blobStore.swift().listDirectory(new Directory(keyPath, 
'/'));
-        }
-        if (files != null && !files.isEmpty()) {
-            for (DirectoryOrObject object : files) {
-                if (object.isObject()) {
-                    String name = object.getName().substring(keyPath.length());
-                    blobsBuilder.put(name, new PlainBlobMetaData(name, 
object.getAsObject().getContentLength()));
-                }
-            }
-        }
-
-        return blobsBuilder.build();
-    }
-
-    /**
-     * Get all the blobs
-     */
-    @Override
-    public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
-        return listBlobsByPrefix(null);
-    }
-
-    /**
-     * Build a key for a blob name, based on the keyPath
-     * @param blobName The blob name to build a key for
-     */
-    protected String buildKey(String blobName) {
-        return keyPath + blobName;
-    }
-}
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
new file mode 100644
index 0000000..b4f57dd
--- /dev/null
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
@@ -0,0 +1,146 @@
+package org.wikimedia.elasticsearch.swift.repositories.blobstore;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.blobstore.BlobMetaData;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
+import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
+import org.elasticsearch.common.collect.ImmutableMap;
+import org.javaswift.joss.model.Directory;
+import org.javaswift.joss.model.DirectoryOrObject;
+import org.javaswift.joss.model.StoredObject;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+
+/**
+ * Swift's implementation of the AbstractBlobContainer
+ */
+public class SwiftBlobContainer extends AbstractBlobContainer {
+       // Our local swift blob store instance
+       protected final SwiftBlobStore blobStore;
+
+       // The root path for blobs. Used by buildKey to build full blob names
+       protected final String keyPath;
+
+       /**
+        * Constructor
+        * 
+        * @param path
+        *            The BlobPath to find blobs in
+        * @param blobStore
+        *            The blob store to use for operations
+        */
+       protected SwiftBlobContainer(BlobPath path, SwiftBlobStore blobStore) {
+               super(path);
+               this.blobStore = blobStore;
+               String keyPath = path.buildAsString("/");
+               if (!keyPath.isEmpty()) {
+                       keyPath = keyPath + "/";
+               }
+               this.keyPath = keyPath;
+       }
+
+       /**
+        * Does a blob exist? Self-explanatory.
+        */
+       @Override
+       public boolean blobExists(String blobName) {
+               return blobStore.swift().getObject(buildKey(blobName)).exists();
+       }
+
+       /**
+        * Delete a blob. Straightforward.
+        * 
+        * @param blobName
+        *            A blob to delete
+        */
+       @Override
+       public boolean deleteBlob(String blobName) throws IOException {
+               StoredObject object = 
blobStore.swift().getObject(buildKey(blobName));
+               if (object.exists()) {
+                       object.delete();
+               }
+               return true;
+       }
+
+       /**
+        * Get the blobs matching a given prefix
+        * 
+        * @param blobNamePrefix
+        *            The prefix to look for blobs with
+        */
+       @Override
+       public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(
+                       @Nullable String blobNamePrefix) throws IOException {
+               ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = 
ImmutableMap
+                               .builder();
+
+               Collection<DirectoryOrObject> files;
+               if (blobNamePrefix != null) {
+                       files = blobStore.swift().listDirectory(
+                                       new Directory(buildKey(blobNamePrefix), 
'/'));
+               } else {
+                       files = blobStore.swift()
+                                       .listDirectory(new Directory(keyPath, 
'/'));
+               }
+               if (files != null && !files.isEmpty()) {
+                       for (DirectoryOrObject object : files) {
+                               if (object.isObject()) {
+                                       String name = 
object.getName().substring(keyPath.length());
+                                       blobsBuilder.put(name, new 
PlainBlobMetaData(name, object
+                                                       
.getAsObject().getContentLength()));
+                               }
+                       }
+               }
+
+               return blobsBuilder.build();
+       }
+
+       /**
+        * Get all the blobs
+        */
+       @Override
+       public ImmutableMap<String, BlobMetaData> listBlobs() throws 
IOException {
+               return listBlobsByPrefix(null);
+       }
+
+       /**
+        * Build a key for a blob name, based on the keyPath
+        * 
+        * @param blobName
+        *            The blob name to build a key for
+        */
+       protected String buildKey(String blobName) {
+               return keyPath + blobName;
+       }
+
+       /**
+        * Fetch a given blob into a BufferedInputStream
+        * 
+        * @param blobName
+        *            The blob name to read
+        */
+       @Override
+       public InputStream openInput(String blobName) throws IOException {
+               return new BufferedInputStream(
+                       
blobStore.swift().getObject(buildKey(blobName)).downloadObjectAsInputStream(),
+                               blobStore.bufferSizeInBytes());
+       }
+
+       @Override
+       public OutputStream createOutput(String blobName) throws IOException {
+               final StoredObject object = 
blobStore.swift().getObject(buildKey(blobName));
+        return new BufferedOutputStream(new FilterOutputStream(null) {
+            @Override
+            public void close() throws IOException {
+                super.close();
+            }
+        }, blobStore.bufferSizeInBytes());
+       }
+}
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
index 40dacb0..1eb4be5 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
@@ -1,8 +1,8 @@
 package org.wikimedia.elasticsearch.swift.repositories.blobstore;
 
+import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -11,15 +11,10 @@
 import org.javaswift.joss.model.Container;
 import org.javaswift.joss.model.StoredObject;
 
-import java.util.concurrent.Executor;
-
 /**
  * Our blob store
  */
 public class SwiftBlobStore extends AbstractComponent implements BlobStore {
-       // Executor for our operations. Sorta like a dedicated Thread pool.
-    private final Executor executor;
-
     // How much to buffer our blobs by
     private final int bufferSizeInBytes;
 
@@ -31,11 +26,9 @@
      * @param settings Settings for our repository. Only care about buffer 
size.
      * @param auth
      * @param container
-     * @param executor
      */
-    public SwiftBlobStore(Settings settings, Account auth, String container, 
Executor executor) {
+    public SwiftBlobStore(Settings settings, Account auth, String container) {
         super(settings);
-        this.executor = executor;
         this.bufferSizeInBytes = (int)settings.getAsBytesSize("buffer_size", 
new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
 
         swift = auth.getContainer(container);
@@ -53,13 +46,6 @@
     }
 
     /**
-     * Get the executor
-     */
-    public Executor executor() {
-        return executor;
-    }
-
-    /**
      * Get our buffer size
      */
     public int bufferSizeInBytes() {
@@ -70,10 +56,10 @@
      * Factory for getting blob containers for a path
      * @param path The blob path to search
      */
-    @Override
-    public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
-        return new SwiftImmutableBlobContainer(path, this);
-    }
+       @Override
+       public BlobContainer blobContainer(BlobPath path) {
+               return new SwiftBlobContainer(path, this);
+       }
 
     /**
      * Delete an arbitrary BlobPath from our store.
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
deleted file mode 100644
index 6f69f6b..0000000
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.wikimedia.elasticsearch.swift.repositories.blobstore;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
-import org.elasticsearch.common.blobstore.support.BlobStores;
-import org.javaswift.joss.model.StoredObject;
-
-/**
- * The final implementation of Swift blob storage. Implements the write 
functionality
- */
-public class SwiftImmutableBlobContainer extends AbstractSwiftBlobContainer 
implements ImmutableBlobContainer {
-       /**
-        * Constructor. Just call the parent.
-     * @param path The BlobPath to find blobs in
-     * @param blobStore The blob store to use for operations
-        */
-    protected SwiftImmutableBlobContainer(BlobPath path, SwiftBlobStore 
blobStore) {
-        super(path, blobStore);
-    }
-
-    /**
-     * Write a blob!
-     * @param blobName The name of the blob we're writing
-     * @param is The data we're writing
-     * @param sizeInBytes How big the data is. We don't care with Swift.
-     * @param listener The listener for write results (failure, etc)
-     */
-    @Override
-    public void writeBlob(final String blobName, final InputStream is, final 
long sizeInBytes, final WriterListener listener) {
-        blobStore.executor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                       // need to remove old file if already exist
-                       StoredObject object = 
blobStore.swift().getObject(buildKey(blobName));
-                       if (object.exists()) {
-                               object.delete();
-                       }
-                       object.uploadObject(is);
-                    listener.onCompleted();
-                } catch (Exception e) {
-                    listener.onFailure(e);
-                }
-            }
-        });
-    }
-
-    /**
-     * Write a blob! Basically boilerplate from other implementations. 
-     * @param blobName The name of the blob we're writing
-     * @param is The data we're writing
-     * @param sizeInBytes How big the data is. We don't care with Swift.
-     */
-    @Override
-    public void writeBlob(String blobName, InputStream is, long sizeInBytes) 
throws IOException {
-        BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
-    }
-}

-- 
To view, visit https://gerrit.wikimedia.org/r/158279
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic159ca0c658a79f0fcc60fd58f8763cc149d271e
Gerrit-PatchSet: 1
Gerrit-Project: search/repository-swift
Gerrit-Branch: master
Gerrit-Owner: Chad <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to