Author: tmaret
Date: Fri Feb  3 14:59:56 2017
New Revision: 1781568

URL: http://svn.apache.org/viewvc?rev=1781568&view=rev
Log:
SLING-6503 - Concurrency issue can prevent repository packages to be cleaned up

* Cleanup the ResourceDistributionPackage packages asynchronously

Added:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageCleanup.java
Modified:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/SharedDistributionPackage.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java?rev=1781568&r1=1781567&r2=1781568&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
 Fri Feb  3 14:59:56 2017
@@ -316,7 +316,13 @@ public class DistributionPackageUtils {
         }
     }
 
-    public static boolean release(Resource resource, @Nonnull String[] 
holderNames) throws RepositoryException {
+    public static boolean disposable(@Nonnull Resource resource) throws 
RepositoryException {
+        Node parent = resource.adaptTo(Node.class);
+        Node refs = parent.getNode("refs");
+        return !refs.hasNodes() && refs.hasProperty("released");
+    }
+
+    public static void release(Resource resource, @Nonnull String[] 
holderNames) throws RepositoryException {
         if (holderNames.length == 0) {
             throw new IllegalArgumentException("holder name cannot be null or 
empty");
         }
@@ -332,12 +338,9 @@ public class DistributionPackageUtils {
             }
         }
 
-        if (!refs.hasNodes()) {
-            refs.remove();
-            return true;
+        if (!refs.hasProperty("released")) {
+            refs.setProperty("released", true);
         }
-
-        return false;
     }
 
     public static void acquire(File file, @Nonnull String[] holderNames) 
throws IOException {

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java?rev=1781568&r1=1781567&r2=1781568&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
 Fri Feb  3 14:59:56 2017
@@ -82,12 +82,7 @@ public class ResourceDistributionPackage
 
     @Override
     public void delete() {
-        try {
-            resourceResolver.delete(resource);
-            resourceResolver.commit();
-        } catch (PersistenceException e) {
-            throw new RuntimeException(e);
-        }
+        delete(true);
     }
 
     @Override
@@ -108,12 +103,7 @@ public class ResourceDistributionPackage
     @Override
     public void release(@Nonnull String... holderNames) {
         try {
-            boolean doDelete = DistributionPackageUtils.release(resource, 
holderNames);
-
-            if (doDelete) {
-                delete();
-            }
-
+            DistributionPackageUtils.release(resource, holderNames);
             if (resourceResolver.hasChanges()) {
                 resourceResolver.commit();
             }
@@ -123,4 +113,24 @@ public class ResourceDistributionPackage
             log.error("cannot release package", e);
         }
     }
+
+    public boolean disposable() {
+        try {
+            return DistributionPackageUtils.disposable(resource);
+        } catch (RepositoryException e) {
+            log.error("cannot check if package is disposable", e);
+        }
+        return false;
+    }
+
+    void delete(boolean save) {
+        try {
+            resourceResolver.delete(resource);
+            if (save) {
+                resourceResolver.commit();
+            }
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
\ No newline at end of file

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java?rev=1781568&r1=1781567&r2=1781568&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
 Fri Feb  3 14:59:56 2017
@@ -28,6 +28,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.DigestOutputStream;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -235,4 +236,42 @@ public class ResourceDistributionPackage
 
         return resource;
     }
+
+    @Nonnull
+    public Iterator<ResourceDistributionPackage> getPackages(@Nonnull 
ResourceResolver resourceResolver)
+            throws DistributionException {
+        try {
+            Resource packagesRoot = 
DistributionPackageUtils.getPackagesRoot(resourceResolver, packagesPath);
+            return new ResourceDistributionPackageIterator(packagesRoot, 
resourceResolver, getType());
+        } catch (PersistenceException e) {
+            throw new DistributionException("Failed to get the package list", 
e);
+        }
+    }
+
+    private static final class ResourceDistributionPackageIterator implements 
Iterator<ResourceDistributionPackage> {
+
+        final Iterator<Resource> packages;
+
+        final ResourceResolver resourceResolver;
+
+        final String type;
+
+        private ResourceDistributionPackageIterator(@Nonnull Resource 
packagesRoot, @Nonnull ResourceResolver resourceResolver,
+                                            @Nonnull String type) {
+            this.packages = packagesRoot.listChildren();
+            this.resourceResolver = resourceResolver;
+            this.type = type;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return packages.hasNext();
+        }
+
+        @Override
+        public ResourceDistributionPackage next() {
+            Resource packageResource = packages.next();
+            return new ResourceDistributionPackage(packageResource, type, 
resourceResolver, null, null);
+        }
+    }
 }

Added: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageCleanup.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageCleanup.java?rev=1781568&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageCleanup.java
 (added)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageCleanup.java
 Fri Feb  3 14:59:56 2017
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.packaging.impl;
+
+
+import java.util.Iterator;
+
+import javax.annotation.Nonnull;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.common.DistributionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This runnable removes unreferenced {@link ResourceDistributionPackage} 
packages.
+ * It is meant to be run periodically. See SLING-6503.
+ */
+public class ResourceDistributionPackageCleanup implements Runnable {
+
+    /**
+     * The default logger
+     */
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final ResourceDistributionPackageBuilder packageBuilder;
+
+    private final ResourceResolverFactory resolverFactory;
+
+    public ResourceDistributionPackageCleanup(@Nonnull ResourceResolverFactory 
resolverFactory,
+                                              @Nonnull 
ResourceDistributionPackageBuilder packageBuilder) {
+        this.resolverFactory = resolverFactory;
+        this.packageBuilder = packageBuilder;
+    }
+
+    public void run () {
+        log.debug("Cleaning up {} packages", packageBuilder.getType());
+        ResourceResolver serviceResolver = null;
+        try {
+            int deleted = 0, total = 0;
+            serviceResolver = resolverFactory.getServiceResourceResolver(null);
+            for (Iterator<ResourceDistributionPackage> pkgs = 
packageBuilder.getPackages(serviceResolver) ; pkgs.hasNext() ; total++) {
+                ResourceDistributionPackage pkg = pkgs.next();
+                if (pkg.disposable()) {
+                    log.trace("Delete package {}", pkg.getId());
+                    deleted++;
+                    pkg.delete(false);
+                }
+            }
+            serviceResolver.commit();
+            log.debug("Cleaned up {}/{} {} packages",
+                    new Object[]{deleted, total, packageBuilder.getType()});
+        } catch (LoginException e) {
+            log.error("Failed to get distribution service resolver: {}", 
e.getMessage());
+        } catch (DistributionException e) {
+            log.error("Failed to get the list of packages", e);
+        } catch (PersistenceException e) {
+            log.error("Failed to delete disposable packages", e);
+        } finally {
+            if (serviceResolver != null && serviceResolver.isLive()) {
+                serviceResolver.close();
+            }
+        }
+    }
+}

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/SharedDistributionPackage.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/SharedDistributionPackage.java?rev=1781568&r1=1781567&r2=1781568&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/SharedDistributionPackage.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/SharedDistributionPackage.java
 Fri Feb  3 14:59:56 2017
@@ -37,7 +37,7 @@ public interface SharedDistributionPacka
 
     /**
      * release a reference to this package and decrease the reference count.
-     * when no more references are hold the package {@code 
DistributionPackage#delete} method is called.
+     * when no more references are hold the package {@code 
DistributionPackage#delete} method will be invoked.
      */
     void release(@Nonnull String... holderNames);
 

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java?rev=1781568&r1=1781567&r2=1781568&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
 Fri Feb  3 14:59:56 2017
@@ -19,6 +19,8 @@
 package org.apache.sling.distribution.serialization.impl;
 
 import java.io.InputStream;
+import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.Map;
 
 import javax.annotation.CheckForNull;
@@ -34,7 +36,9 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.common.DistributionException;
 import 
org.apache.sling.distribution.component.impl.DistributionComponentConstants;
@@ -45,9 +49,11 @@ import org.apache.sling.distribution.pac
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import 
org.apache.sling.distribution.packaging.impl.FileDistributionPackageBuilder;
 import 
org.apache.sling.distribution.packaging.impl.ResourceDistributionPackageBuilder;
+import 
org.apache.sling.distribution.packaging.impl.ResourceDistributionPackageCleanup;
 import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
 import 
org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
 
 /**
  * A factory for package builders
@@ -87,6 +93,8 @@ public class DistributionPackageBuilderF
     @Reference(name = "format")
     private DistributionContentSerializer contentSerializer;
 
+    @Reference
+    private ResourceResolverFactory resolverFactory;
 
     /**
      * Temp file folder
@@ -155,6 +163,16 @@ public class DistributionPackageBuilderF
     )
     private static final String MONITORING_QUEUE_SIZE = "monitoringQueueSize";
 
+    private static final long DEFAULT_PACKAGE_CLEANUP_DELAY = 60L;
+
+    @Property(
+            label="The delay in seconds between two runs of the cleanup phase 
for resource persisted packages.",
+            description = "The resource persisted packages are cleaned up 
periodically (asynchronously) since SLING-6503." +
+                    "The delay between two runs of the cleanup phase can be 
configured with this setting. 60 seconds by default",
+            longValue = DEFAULT_PACKAGE_CLEANUP_DELAY
+    )
+    private static final String PACKAGE_CLEANUP_DELAY = "cleanupDelay";
+
     /**
      * Package node filters
      */
@@ -170,6 +188,8 @@ public class DistributionPackageBuilderF
 
     private MonitoringDistributionPackageBuilder packageBuilder;
 
+    private ServiceRegistration packageCleanup = null;
+
     @Activate
     public void activate(BundleContext context,
                          Map<String, Object> config) {
@@ -179,6 +199,7 @@ public class DistributionPackageBuilderF
         String persistenceType = 
PropertiesUtil.toString(config.get(PERSISTENCE), null);
         String tempFsFolder = 
SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(TEMP_FS_FOLDER),
 null));
         String digestAlgorithm = 
PropertiesUtil.toString(config.get(DIGEST_ALGORITHM), DEFAULT_DIGEST_ALGORITHM);
+        long cleanupDelay = 
PropertiesUtil.toLong(config.get(PACKAGE_CLEANUP_DELAY), 
DEFAULT_PACKAGE_CLEANUP_DELAY);
         if (DEFAULT_DIGEST_ALGORITHM.equals(digestAlgorithm)) {
             digestAlgorithm = null;
         }
@@ -191,7 +212,13 @@ public class DistributionPackageBuilderF
             String memoryUnitName = 
PropertiesUtil.toString(config.get(MEMORY_UNIT), DEFAULT_MEMORY_UNIT);
             final MemoryUnit memoryUnit = MemoryUnit.valueOf(memoryUnitName);
             final boolean useOffHeapMemory = 
PropertiesUtil.toBoolean(config.get(USE_OFF_HEAP_MEMORY), 
DEFAULT_USE_OFF_HEAP_MEMORY);
-            wrapped = new 
ResourceDistributionPackageBuilder(contentSerializer.getName(), 
contentSerializer, tempFsFolder, fileThreshold, memoryUnit, useOffHeapMemory, 
digestAlgorithm, nodeFilters, propertyFilters);
+            ResourceDistributionPackageBuilder 
resourceDistributionPackageBuilder = new 
ResourceDistributionPackageBuilder(contentSerializer.getName(), 
contentSerializer, tempFsFolder, fileThreshold, memoryUnit, useOffHeapMemory, 
digestAlgorithm, nodeFilters, propertyFilters);
+            Runnable cleanup = new 
ResourceDistributionPackageCleanup(resolverFactory, 
resourceDistributionPackageBuilder);
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(Scheduler.PROPERTY_SCHEDULER_CONCURRENT, false);
+            props.put(Scheduler.PROPERTY_SCHEDULER_PERIOD, cleanupDelay);
+            packageCleanup = context.registerService(Runnable.class.getName(), 
cleanup, props);
+            wrapped = resourceDistributionPackageBuilder;
         }
 
         int monitoringQueueSize = 
PropertiesUtil.toInteger(config.get(MONITORING_QUEUE_SIZE), 
DEFAULT_MONITORING_QUEUE_SIZE);
@@ -201,6 +228,9 @@ public class DistributionPackageBuilderF
     @Deactivate
     public void deactivate() {
         packageBuilder.clear();
+        if (packageCleanup != null) {
+            packageCleanup.unregister();
+        }
     }
 
     public String getType() {

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java?rev=1781568&r1=1781567&r2=1781568&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
 Fri Feb  3 14:59:56 2017
@@ -19,6 +19,8 @@
 package org.apache.sling.distribution.serialization.impl.vlt;
 
 import java.io.InputStream;
+import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.Map;
 
 import javax.annotation.CheckForNull;
@@ -37,7 +39,9 @@ import org.apache.jackrabbit.vault.fs.ap
 import org.apache.jackrabbit.vault.fs.io.AccessControlHandling;
 import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.common.DistributionException;
 import 
org.apache.sling.distribution.component.impl.DistributionComponentConstants;
@@ -48,9 +52,11 @@ import org.apache.sling.distribution.pac
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import 
org.apache.sling.distribution.packaging.impl.FileDistributionPackageBuilder;
 import 
org.apache.sling.distribution.packaging.impl.ResourceDistributionPackageBuilder;
+import 
org.apache.sling.distribution.packaging.impl.ResourceDistributionPackageCleanup;
 import 
org.apache.sling.distribution.serialization.DistributionContentSerializer;
 import 
org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
 
 /**
  * A package builder for Apache Jackrabbit FileVault based implementations.
@@ -130,6 +136,16 @@ public class VaultDistributionPackageBui
     @Property(label="Autosave threshold", description = "The value after which 
autosave is triggered for intermediate changes.", intValue = -1)
     public static final String AUTOSAVE_THRESHOLD = "autoSaveThreshold";
 
+    private static final long DEFAULT_PACKAGE_CLEANUP_DELAY = 60L;
+
+    @Property(
+            label="The delay in seconds between two runs of the cleanup phase 
for resource persisted packages.",
+            description = "The resource persisted packages are cleaned up 
periodically (asynchronously) since SLING-6503." +
+                    "The delay between two runs of the cleanup phase can be 
configured with this setting. 60 seconds by default",
+            longValue = DEFAULT_PACKAGE_CLEANUP_DELAY
+    )
+    private static final String PACKAGE_CLEANUP_DELAY = "cleanupDelay";
+
     // 1M
     private static final int DEFAULT_FILE_THRESHOLD_VALUE = 1;
 
@@ -194,6 +210,11 @@ public class VaultDistributionPackageBui
     @Reference
     private Packaging packaging;
 
+    @Reference
+    private ResourceResolverFactory resolverFactory;
+
+    private ServiceRegistration packageCleanup = null;
+
     private MonitoringDistributionPackageBuilder packageBuilder;
 
 
@@ -209,6 +230,8 @@ public class VaultDistributionPackageBui
         String[] packageNodeFilters = 
SettingsUtils.removeEmptyEntries(PropertiesUtil.toStringArray(config.get(PACKAGE_FILTERS),
 null));
         String[] packagePropertyFilters = 
SettingsUtils.removeEmptyEntries(PropertiesUtil.toStringArray(config.get(PROPERTY_FILTERS),
 null));
 
+        long cleanupDelay = 
PropertiesUtil.toLong(config.get(PACKAGE_CLEANUP_DELAY), 
DEFAULT_PACKAGE_CLEANUP_DELAY);
+
         String tempFsFolder = 
SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(TEMP_FS_FOLDER),
 null));
         boolean useBinaryReferences = 
PropertiesUtil.toBoolean(config.get(USE_BINARY_REFERENCES), false);
         int autosaveThreshold = 
PropertiesUtil.toInteger(config.get(AUTOSAVE_THRESHOLD), -1);
@@ -239,7 +262,13 @@ public class VaultDistributionPackageBui
             String memoryUnitName = 
PropertiesUtil.toString(config.get(MEMORY_UNIT), DEFAULT_MEMORY_UNIT);
             final MemoryUnit memoryUnit = MemoryUnit.valueOf(memoryUnitName);
             final boolean useOffHeapMemory = 
PropertiesUtil.toBoolean(config.get(USE_OFF_HEAP_MEMORY), 
DEFAULT_USE_OFF_HEAP_MEMORY);
-            wrapped = new 
ResourceDistributionPackageBuilder(contentSerializer.getName(), 
contentSerializer, tempFsFolder, fileThreshold, memoryUnit, useOffHeapMemory, 
digestAlgorithm, packageNodeFilters, packagePropertyFilters);
+            ResourceDistributionPackageBuilder 
resourceDistributionPackageBuilder = new 
ResourceDistributionPackageBuilder(contentSerializer.getName(), 
contentSerializer, tempFsFolder, fileThreshold, memoryUnit, useOffHeapMemory, 
digestAlgorithm, packageNodeFilters, packagePropertyFilters);
+            Runnable cleanup = new 
ResourceDistributionPackageCleanup(resolverFactory, 
resourceDistributionPackageBuilder);
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(Scheduler.PROPERTY_SCHEDULER_CONCURRENT, false);
+            props.put(Scheduler.PROPERTY_SCHEDULER_PERIOD, cleanupDelay);
+            packageCleanup = context.registerService(Runnable.class.getName(), 
cleanup, props);
+            wrapped = resourceDistributionPackageBuilder;
         }
 
         int monitoringQueueSize = 
PropertiesUtil.toInteger(config.get(MONITORING_QUEUE_SIZE), 
DEFAULT_MONITORING_QUEUE_SIZE);
@@ -249,6 +278,9 @@ public class VaultDistributionPackageBui
     @Deactivate
     public void deactivate() {
         packageBuilder.clear();
+        if (packageCleanup != null) {
+            packageCleanup.unregister();
+        }
     }
 
     public String getType() {


Reply via email to