Chad has submitted this change and it was merged.

Change subject: Rewrite BlobStore support
......................................................................


Rewrite BlobStore support

Upstream is breaking everything. So we'll have to too. It's
actually much cleaner and less abstracted.

Begin tracking 1.4 in master for 0.7 release.

Change-Id: Ic159ca0c658a79f0fcc60fd58f8763cc149d271e
---
M pom.xml
M src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
M 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
M 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
M 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
M src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
M 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
M 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
D 
src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
9 files changed, 233 insertions(+), 319 deletions(-)

Approvals:
  Manybubbles: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/pom.xml b/pom.xml
index 4ced71e..8a26d9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <elasticsearch.version>1.3.2</elasticsearch.version>
+        <elasticsearch.version>1.4.0</elasticsearch.version>
     </properties>
 
     <dependencies>
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java 
b/src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
index cb668fb..0ad94de 100644
--- a/src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
+++ b/src/main/java/org/wikimedia/elasticsearch/swift/SwiftRepositoryPlugin.java
@@ -15,7 +15,7 @@
  * Our base plugin stuff.
  */
 public class SwiftRepositoryPlugin extends AbstractPlugin {
-       // Elasticsearch settings
+    // Elasticsearch settings
     private final Settings settings;
 
     /**
@@ -45,9 +45,9 @@
     /**
      * Register our services, if needed.
      */
-       @Override
-       @SuppressWarnings("rawtypes")
-       public Collection<Class<? extends LifecycleComponent>> services() {
+    @Override
+    @SuppressWarnings("rawtypes")
+    public Collection<Class<? extends LifecycleComponent>> services() {
         Collection<Class<? extends LifecycleComponent>> services = 
Lists.newArrayList();
         if (settings.getAsBoolean("swift.repository.enabled", true)) {
             services.add(SwiftService.class);
@@ -55,10 +55,10 @@
         return services;
     }
 
-       /**
-        * Load our repository module into the list, if enabled
-        * @param repositoriesModule The repositories module to register 
ourselves with
-        */
+    /**
+     * Load our repository module into the list, if enabled
+     * @param repositoriesModule The repositories module to register ourselves 
with
+     */
     public void onModule(RepositoriesModule repositoriesModule) {
         if (settings.getAsBoolean("swift.repository.enabled", true)) {
             repositoriesModule.registerRepository(SwiftRepository.TYPE, 
SwiftRepositoryModule.class);
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
index 58cdf64..eec617b 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftAccountFactory.java
@@ -4,17 +4,17 @@
 
 public class SwiftAccountFactory {
 
-       public static Account createAccount(SwiftService swiftService, String 
url, String username, String password, String tenantName, String authMethod) {
-               if ("KEYSTONE".equals(authMethod.toUpperCase())) {
-                       return swiftService.swiftKeyStone(url, username, 
password, tenantName);
-               }
+    public static Account createAccount(SwiftService swiftService, String url, 
String username, String password, String tenantName, String authMethod) {
+        if ("KEYSTONE".equals(authMethod.toUpperCase())) {
+            return swiftService.swiftKeyStone(url, username, password, 
tenantName);
+        }
 
-               if ("TEMPAUTH".equals(authMethod.toUpperCase())) {
-                       return swiftService.swiftTempAuth(url, username, 
password);
-               }
+        if ("TEMPAUTH".equals(authMethod.toUpperCase())) {
+            return swiftService.swiftTempAuth(url, username, password);
+        }
 
-               return swiftService.swiftBasic(url, username, password);
+        return swiftService.swiftBasic(url, username, password);
 
-       }
+    }
 
 }
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
index 8e04759..1d4ecd7 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepository.java
@@ -1,14 +1,10 @@
 package org.wikimedia.elasticsearch.swift.repositories;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.snapshots.IndexShardRepository;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryName;
@@ -21,95 +17,90 @@
  * The blob store repository. A glorified settings wrapper.
  */
 public class SwiftRepository extends BlobStoreRepository {
-       // The internal "type" for Elasticsearch
-       public final static String TYPE = "swift";
+    // The internal "type" for Elasticsearch
+    public final static String TYPE = "swift";
 
-       // Our blob store instance
-       private final SwiftBlobStore blobStore;
+    // Our blob store instance
+    private final SwiftBlobStore blobStore;
 
-       // Base path for blobs
-       private final BlobPath basePath;
+    // Base path for blobs
+    private final BlobPath basePath;
 
-       // Chunk size.
-       private final ByteSizeValue chunkSize;
+    // Chunk size.
+    private final ByteSizeValue chunkSize;
 
-       // Are we compressing our snapshots?
-       private final boolean compress;
+    // Are we compressing our snapshots?
+    private final boolean compress;
 
-       /**
-        * Constructs new BlobStoreRepository
-        * 
-        * @param name
-        *            repository name
-        * @param repositorySettings
-        *            repository settings
-        * @param indexShardRepository
-        *            an instance of IndexShardRepository
-        * @param swiftService
-        *            an instance of SwiftService
-        */
-       @Inject
-       protected SwiftRepository(RepositoryName name, RepositorySettings 
repositorySettings, IndexShardRepository indexShardRepository, SwiftService 
swiftService) {
-               super(name.getName(), repositorySettings, indexShardRepository);
+    /**
+     * Constructs new BlobStoreRepository
+     * 
+     * @param name
+     *            repository name
+     * @param repositorySettings
+     *            repository settings
+     * @param indexShardRepository
+     *            an instance of IndexShardRepository
+     * @param swiftService
+     *            an instance of SwiftService
+     */
+    @Inject
+    protected SwiftRepository(RepositoryName name, RepositorySettings 
repositorySettings, IndexShardRepository indexShardRepository, SwiftService 
swiftService) {
+        super(name.getName(), repositorySettings, indexShardRepository);
 
-               String url = repositorySettings.settings().get("swift_url");
-               if (url == null) {
-                       throw new RepositoryException(name.name(), "No url 
defined for swift repository");
-               }
+        String url = repositorySettings.settings().get("swift_url");
+        if (url == null) {
+            throw new RepositoryException(name.name(), "No url defined for 
swift repository");
+        }
 
-               String container = 
repositorySettings.settings().get("swift_container");
-               if (container == null) {
-                       throw new RepositoryException(name.name(), "No 
container defined for swift repository");
-               }
+        String container = 
repositorySettings.settings().get("swift_container");
+        if (container == null) {
+            throw new RepositoryException(name.name(), "No container defined 
for swift repository");
+        }
 
-               String username = 
repositorySettings.settings().get("swift_username", "");
-               String password = 
repositorySettings.settings().get("swift_password", "");
-               String tenantName = 
repositorySettings.settings().get("swift_tenantname", "");
-               String authMethod = 
repositorySettings.settings().get("swift_authmethod", "");
+        String username = repositorySettings.settings().get("swift_username", 
"");
+        String password = repositorySettings.settings().get("swift_password", 
"");
+        String tenantName = 
repositorySettings.settings().get("swift_tenantname", "");
+        String authMethod = 
repositorySettings.settings().get("swift_authmethod", "");
+        Account account = SwiftAccountFactory.createAccount(swiftService, url, 
username, password, tenantName, authMethod);
 
-               int concurrentStreams = 
repositorySettings.settings().getAsInt("concurrent_streams", 
componentSettings.getAsInt("concurrent_streams", 5));
-               ExecutorService concurrentStreamPool = EsExecutors.newScaling(
-                               1, concurrentStreams, 5, TimeUnit.SECONDS, 
EsExecutors.daemonThreadFactory(settings, "[swift_stream]"));
+        blobStore = new SwiftBlobStore(settings, account, container);
 
-               Account account = 
SwiftAccountFactory.createAccount(swiftService, url, username, password, 
tenantName, authMethod);
+        this.chunkSize = 
repositorySettings.settings().getAsBytesSize("chunk_size",
+                componentSettings.getAsBytesSize("chunk_size", new 
ByteSizeValue(5, ByteSizeUnit.GB)));
+        this.compress = repositorySettings.settings().getAsBoolean("compress", 
componentSettings.getAsBoolean("compress", false));
+        this.basePath = BlobPath.cleanPath();
+    }
 
-               blobStore = new SwiftBlobStore(settings, account, container, 
concurrentStreamPool);
+    /**
+     * Get the blob store
+     */
+    @Override
+    protected BlobStore blobStore() {
+        return blobStore;
+    }
 
-               this.chunkSize = 
repositorySettings.settings().getAsBytesSize("chunk_size",
-                               componentSettings.getAsBytesSize("chunk_size", 
new ByteSizeValue(5, ByteSizeUnit.GB)));
-               this.compress = 
repositorySettings.settings().getAsBoolean("compress", 
componentSettings.getAsBoolean("compress", false));
-               this.basePath = BlobPath.cleanPath();
-       }
+    /**
+     * Get the base blob path
+     */
+    @Override
+    protected BlobPath basePath() {
+        return basePath;
+    }
 
-       /**
-        * Get the blob store
-        */
-       @Override
-       protected BlobStore blobStore() {
-               return blobStore;
-       }
+    /**
+     * Get the chunk size
+     */
+    @Override
+    protected ByteSizeValue chunkSize() {
+        return chunkSize;
+    }
 
-       /**
-        * Get the base blob path
-        */
-       @Override
-       protected BlobPath basePath() {
-               return basePath;
-       }
-
-       /**
-        * Get the chunk size
-        */
-       @Override
-       protected ByteSizeValue chunkSize() {
-               return chunkSize;
-       }
-
-       /**
-        * Are we compressing our snapshots?
-        */
-       @Override
-       protected boolean isCompress() {
-               return compress;
-       }
+    /**
+     * Are we compressing our snapshots?
+     */
+    @Override
+    protected boolean isCompress() {
+        return compress;
+    }
 }
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
index dbb9e43..f382f4a 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftRepositoryModule.java
@@ -9,13 +9,6 @@
  * Swift repository module. Binds us to things.
  */
 public class SwiftRepositoryModule extends AbstractModule {
-       /**
-        * Constructor. Super boring.
-        */
-    public SwiftRepositoryModule() {
-        super();
-    }
-
     /**
      * Do the binding.
      */
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
index 6642344..cbcb90c 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/SwiftService.java
@@ -12,103 +12,103 @@
 import org.javaswift.joss.model.Account;
 
 public class SwiftService extends AbstractLifecycleComponent<SwiftService> {
-       // The account we'll be connecting to Swift with
-       private Account swiftUser;
+    // The account we'll be connecting to Swift with
+    private Account swiftUser;
 
-       /**
-        * Constructor
-        * 
-        * @param settings
-        *            Settings for our repository. Injected.
-        */
-       @Inject
-       public SwiftService(Settings settings) {
-               super(settings);
-       }
+    /**
+     * Constructor
+     * 
+     * @param settings
+     *            Settings for our repository. Injected.
+     */
+    @Inject
+    public SwiftService(Settings settings) {
+        super(settings);
+    }
 
-       /**
-        * Create a Swift account object and connect it to Swift
-        * 
-        * @param url
-        *            The auth url (eg: localhost:8080/auth/v1.0/)
-        * @param username
-        *            The username
-        * @param password
-        *            The password
-        */
-       public synchronized Account swiftBasic(String url, String username, 
String password) {
-               if (swiftUser != null) {
-                       return swiftUser;
-               }
+    /**
+     * Create a Swift account object and connect it to Swift
+     * 
+     * @param url
+     *            The auth url (eg: localhost:8080/auth/v1.0/)
+     * @param username
+     *            The username
+     * @param password
+     *            The password
+     */
+    public synchronized Account swiftBasic(String url, String username, String 
password) {
+        if (swiftUser != null) {
+            return swiftUser;
+        }
 
-               try {
-                       AccountConfig conf = getStandardConfig(url, username, 
password, AuthenticationMethod.BASIC);
-                       swiftUser = new AccountFactory(conf).createAccount();
-               } catch (CommandException ce) {
-                       throw new ElasticsearchIllegalArgumentException("Unable 
to authenticate to Swift Basic " + url + "/" + username + "/" + password, ce);
-               }
-               return swiftUser;
-       }
+        try {
+            AccountConfig conf = getStandardConfig(url, username, password, 
AuthenticationMethod.BASIC);
+            swiftUser = new AccountFactory(conf).createAccount();
+        } catch (CommandException ce) {
+            throw new ElasticsearchIllegalArgumentException("Unable to 
authenticate to Swift Basic " + url + "/" + username + "/" + password, ce);
+        }
+        return swiftUser;
+    }
 
-       public synchronized Account swiftKeyStone(String url, String username, 
String password, String tenantName) {
-               if (swiftUser != null) {
-                       return swiftUser;
-               }
+    public synchronized Account swiftKeyStone(String url, String username, 
String password, String tenantName) {
+        if (swiftUser != null) {
+            return swiftUser;
+        }
 
-               try {
-                       AccountConfig conf = getStandardConfig(url, username, 
password, AuthenticationMethod.KEYSTONE);
-                       conf.setTenantName(tenantName);
-                       swiftUser = new AccountFactory(conf).createAccount();
-               } catch (CommandException ce) {
-                       throw new ElasticsearchIllegalArgumentException(
-                                       "Unable to authenticate to Swift 
Keystone " + url + "/" + username + "/" + password + "/" + tenantName, ce);
-               }
-               return swiftUser;
-       }
+        try {
+            AccountConfig conf = getStandardConfig(url, username, password, 
AuthenticationMethod.KEYSTONE);
+            conf.setTenantName(tenantName);
+            swiftUser = new AccountFactory(conf).createAccount();
+        } catch (CommandException ce) {
+            throw new ElasticsearchIllegalArgumentException(
+                    "Unable to authenticate to Swift Keystone " + url + "/" + 
username + "/" + password + "/" + tenantName, ce);
+        }
+        return swiftUser;
+    }
 
-       public synchronized Account swiftTempAuth(String url, String username, 
String password) {
-               if (swiftUser != null) {
-                       return swiftUser;
-               }
+    public synchronized Account swiftTempAuth(String url, String username, 
String password) {
+        if (swiftUser != null) {
+            return swiftUser;
+        }
 
-               try {
-                       AccountConfig conf = getStandardConfig(url, username, 
password, AuthenticationMethod.TEMPAUTH);
-                       swiftUser = new AccountFactory(conf).createAccount();
-               } catch (CommandException ce) {
-                       throw new ElasticsearchIllegalArgumentException("Unable 
to authenticate to Swift Temp", ce);
-               }
-               return swiftUser;
-       }
+        try {
+            AccountConfig conf = getStandardConfig(url, username, password, 
AuthenticationMethod.TEMPAUTH);
+            swiftUser = new AccountFactory(conf).createAccount();
+        } catch (CommandException ce) {
+            throw new ElasticsearchIllegalArgumentException("Unable to 
authenticate to Swift Temp", ce);
+        }
+        return swiftUser;
+    }
 
-       private AccountConfig getStandardConfig(String url, String username, 
String password, AuthenticationMethod method) {
-               AccountConfig conf = new AccountConfig();
-               conf.setAuthUrl(url);
-               conf.setUsername(username);
-               conf.setPassword(password);
-               conf.setAuthenticationMethod(method);
-               conf.setAllowContainerCaching(false);
-               conf.setAllowCaching(false);
-               return conf;
-       }
+    private AccountConfig getStandardConfig(String url, String username, 
String password, AuthenticationMethod method) {
+        AccountConfig conf = new AccountConfig();
+        conf.setAuthUrl(url);
+        conf.setUsername(username);
+        conf.setPassword(password);
+        conf.setAuthenticationMethod(method);
+        conf.setAllowContainerCaching(false);
+        conf.setAllowCaching(false);
+        return conf;
+    }
 
-       /**
-        * Start the service. No-op here.
-        */
-       @Override
-       protected void doStart() throws ElasticsearchException {
-       }
+    /**
+     * Start the service. No-op here.
+     */
+    @Override
+    protected void doStart() throws ElasticsearchException {
+    }
 
-       /**
-        * Stop the service. No-op here.
-        */
-       @Override
-       protected void doStop() throws ElasticsearchException {
-       }
+    /**
+     * Stop the service. No-op here.
+     */
+    @Override
+    protected void doStop() throws ElasticsearchException {
+    }
 
-       /**
-        * Close the service. No-op here.
-        */
-       @Override
-       protected void doClose() throws ElasticsearchException {
-       }
+    /**
+     * Close the service. No-op here.
+     */
+    @Override
+    protected void doClose() throws ElasticsearchException {
+    }
 }
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
index b20e8c0..3171385 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java
@@ -10,15 +10,19 @@
 import org.javaswift.joss.model.DirectoryOrObject;
 import org.javaswift.joss.model.StoredObject;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.util.Collection;
 
 /**
  * Swift's implementation of the AbstractBlobContainer
  */
 public class SwiftBlobContainer extends AbstractBlobContainer {
-       // Our local swift blob store instance
+    // Our local swift blob store instance
     protected final SwiftBlobStore blobStore;
 
     // The root path for blobs. Used by buildKey to build full blob names
@@ -44,46 +48,7 @@
      */
     @Override
     public boolean blobExists(String blobName) {
-       return blobStore.swift().getObject(buildKey(blobName)).exists();
-    }
-
-    /**
-     * Read a given blob into the listener
-     * @param blobName The blob name to read
-     * @param listener The listener to report our read info back to
-     */
-    @Override
-    public void readBlob(final String blobName, final ReadBlobListener 
listener) {
-        blobStore.executor().execute(new Runnable() {
-            @Override
-            public void run() {
-                InputStream is;
-                try {
-                       // This is the interesting bit. Fetch the blob and then 
turn it
-                       // into an InputStream for reading below
-                    StoredObject object = 
blobStore.swift().getObject(buildKey(blobName));
-                    is = object.downloadObjectAsInputStream();
-                } catch (Exception e) {
-                    listener.onFailure(e);
-                    return;
-                }
-                byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
-                try {
-                    int bytesRead;
-                    while ((bytesRead = is.read(buffer)) != -1) {
-                        listener.onPartial(buffer, 0, bytesRead);
-                    }
-                    listener.onCompleted();
-                } catch (Exception e) {
-                    try {
-                        is.close();
-                    } catch (IOException e1) {
-                        // ignore
-                    }
-                    listener.onFailure(e);
-                }
-            }
-        });
+        return blobStore.swift().getObject(buildKey(blobName)).exists();
     }
 
     /**
@@ -92,10 +57,10 @@
      */
     @Override
     public boolean deleteBlob(String blobName) throws IOException {
-       StoredObject object = blobStore.swift().getObject(buildKey(blobName));
-       if (object.exists()) {
-               object.delete();
-       }
+        StoredObject object = blobStore.swift().getObject(buildKey(blobName));
+        if (object.exists()) {
+            object.delete();
+        }
         return true;
     }
 
@@ -140,4 +105,44 @@
     protected String buildKey(String blobName) {
         return keyPath + blobName;
     }
+
+    /**
+     * Fetch a given blob into a BufferedInputStream
+     * @param blobName The blob name to read
+     */
+    @Override
+    public InputStream openInput(String blobName) throws IOException {
+        return new BufferedInputStream(
+            
blobStore.swift().getObject(buildKey(blobName)).downloadObjectAsInputStream(),
+                blobStore.bufferSizeInBytes());
+    }
+
+    @Override
+    public OutputStream createOutput(final String blobName) throws IOException 
{
+       // need to remove old file if already exist
+       deleteBlob(blobName);
+
+       final PipedInputStream in = new PipedInputStream();
+
+       // We'll need to store this thread and make sure it terminates when the 
output stream is closed.
+       final Thread transport = new Thread(new Runnable(){
+          public void run(){
+              blobStore.swift().getObject(buildKey(blobName)).uploadObject(in);
+          }
+       });
+       transport.start();
+
+       return new PipedOutputStream(in) {
+           @Override
+           public void close() throws IOException {
+               try {
+                   // Close output, close the thread
+                   super.close();
+                   transport.join();
+               } catch(InterruptedException e) {
+                   throw new IOException("Swift input/output shenanigans.", e);
+               }
+           }
+       };
+    }
 }
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
index 40dacb0..61877e6 100644
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
+++ 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobStore.java
@@ -1,8 +1,8 @@
 package org.wikimedia.elasticsearch.swift.repositories.blobstore;
 
+import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -11,15 +11,10 @@
 import org.javaswift.joss.model.Container;
 import org.javaswift.joss.model.StoredObject;
 
-import java.util.concurrent.Executor;
-
 /**
  * Our blob store
  */
 public class SwiftBlobStore extends AbstractComponent implements BlobStore {
-       // Executor for our operations. Sorta like a dedicated Thread pool.
-    private final Executor executor;
-
     // How much to buffer our blobs by
     private final int bufferSizeInBytes;
 
@@ -31,11 +26,9 @@
      * @param settings Settings for our repository. Only care about buffer 
size.
      * @param auth
      * @param container
-     * @param executor
      */
-    public SwiftBlobStore(Settings settings, Account auth, String container, 
Executor executor) {
+    public SwiftBlobStore(Settings settings, Account auth, String container) {
         super(settings);
-        this.executor = executor;
         this.bufferSizeInBytes = (int)settings.getAsBytesSize("buffer_size", 
new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
 
         swift = auth.getContainer(container);
@@ -53,13 +46,6 @@
     }
 
     /**
-     * Get the executor
-     */
-    public Executor executor() {
-        return executor;
-    }
-
-    /**
      * Get our buffer size
      */
     public int bufferSizeInBytes() {
@@ -71,8 +57,8 @@
      * @param path The blob path to search
      */
     @Override
-    public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
-        return new SwiftImmutableBlobContainer(path, this);
+    public BlobContainer blobContainer(BlobPath path) {
+        return new SwiftBlobContainer(path, this);
     }
 
     /**
@@ -81,7 +67,7 @@
      */
     @Override
     public void delete(BlobPath path) {
-       String keyPath = path.buildAsString("/");
+        String keyPath = path.buildAsString("/");
         if (!keyPath.isEmpty()) {
             keyPath = keyPath + "/";
         }
diff --git 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
 
b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
deleted file mode 100644
index 8ea65a4..0000000
--- 
a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftImmutableBlobContainer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.wikimedia.elasticsearch.swift.repositories.blobstore;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
-import org.elasticsearch.common.blobstore.support.BlobStores;
-import org.javaswift.joss.model.StoredObject;
-
-/**
- * The final implementation of Swift blob storage. Implements the write 
functionality
- */
-public class SwiftImmutableBlobContainer extends SwiftBlobContainer implements 
ImmutableBlobContainer {
-       /**
-        * Constructor. Just call the parent.
-     * @param path The BlobPath to find blobs in
-     * @param blobStore The blob store to use for operations
-        */
-    protected SwiftImmutableBlobContainer(BlobPath path, SwiftBlobStore 
blobStore) {
-        super(path, blobStore);
-    }
-
-    /**
-     * Write a blob!
-     * @param blobName The name of the blob we're writing
-     * @param is The data we're writing
-     * @param sizeInBytes How big the data is. We don't care with Swift.
-     * @param listener The listener for write results (failure, etc)
-     */
-    @Override
-    public void writeBlob(final String blobName, final InputStream is, final 
long sizeInBytes, final WriterListener listener) {
-        blobStore.executor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                       // need to remove old file if already exist
-                       StoredObject object = 
blobStore.swift().getObject(buildKey(blobName));
-                       if (object.exists()) {
-                               object.delete();
-                       }
-                       object.uploadObject(is);
-                    listener.onCompleted();
-                } catch (Exception e) {
-                    listener.onFailure(e);
-                }
-            }
-        });
-    }
-
-    /**
-     * Write a blob! Basically boilerplate from other implementations. 
-     * @param blobName The name of the blob we're writing
-     * @param is The data we're writing
-     * @param sizeInBytes How big the data is. We don't care with Swift.
-     */
-    @Override
-    public void writeBlob(String blobName, InputStream is, long sizeInBytes) 
throws IOException {
-        BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
-    }
-}

-- 
To view, visit https://gerrit.wikimedia.org/r/158279
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic159ca0c658a79f0fcc60fd58f8763cc149d271e
Gerrit-PatchSet: 11
Gerrit-Project: search/repository-swift
Gerrit-Branch: master
Gerrit-Owner: Chad <[email protected]>
Gerrit-Reviewer: BearND <[email protected]>
Gerrit-Reviewer: Chad <[email protected]>
Gerrit-Reviewer: Manybubbles <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to