This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f400d33287 [ASTERIXDB-3144][RT] Implement Static Partitioning
f400d33287 is described below

commit f400d33287536d81408c1475e510c50bdd344c27
Author: Murtadha Hubail <murtadha.hub...@couchbase.com>
AuthorDate: Fri May 5 03:31:47 2023 +0300

    [ASTERIXDB-3144][RT] Implement Static Partitioning
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Implement static partitioning based on storage/compute
      partitions map.
    - Fixes for LSMPrimaryInsertOperatorNodePushable state
      keeping for working on multiple storage partitions.
    
    Change-Id: Ieca7ffb0f48e16fba4dc5beb0868c1ef8ac9245e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17509
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../asterix/app/cc/CcApplicationContext.java       |   2 +-
 .../asterix/runtime/ClusterStateManagerTest.java   |  11 ++
 .../asterix/common/cluster/ComputePartition.java   |  37 +++++++
 .../common/cluster/SplitComputeLocations.java      |  41 ++++++++
 .../cluster/StorageComputePartitionsMap.java       |  93 +++++++++++++++++
 .../asterix/common/utils/StorageConstants.java     |   2 +
 .../adapter/factory/GenericAdapterFactory.java     |   1 +
 .../metadata/declared/MetadataProvider.java        |   5 +-
 .../metadata/utils/DataPartitioningProvider.java   |  90 +++++++---------
 .../utils/DynamicDataPartitioningProvider.java     |  57 ++++++++++
 .../utils/StaticDataPartitioningProvider.java      | 116 +++++++++++++++++++++
 .../LSMPrimaryInsertOperatorNodePushable.java      |  38 +++----
 .../asterix/runtime/utils/ClusterStateManager.java |  20 +++-
 .../PersistentLocalResourceRepository.java         |   2 +-
 14 files changed, 440 insertions(+), 75 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index a2d99a0332..e4247f0dd9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -157,7 +157,7 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         requestTracker = new RequestTracker(this);
         configValidator = configValidatorFactory.create();
         this.adapterFactoryService = adapterFactoryService;
-        dataPartitioningProvider = new DataPartitioningProvider(this);
+        dataPartitioningProvider = DataPartitioningProvider.create(this);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index a2a3b48393..53b9294e2b 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -33,8 +33,10 @@ import 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.PartitioningScheme;
 import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
@@ -231,6 +233,9 @@ public class ClusterStateManagerTest {
         MetadataProperties metadataProperties = mockMetadataProperties();
         
Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties);
 
+        StorageProperties storageProperties = mockStorageProperties();
+        
Mockito.when(ccApplicationContext.getStorageProperties()).thenReturn(storageProperties);
+
         ResourceIdManager resourceIdManager = new ResourceIdManager(csm);
         
Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager);
 
@@ -258,6 +263,12 @@ public class ClusterStateManagerTest {
         return metadataProperties;
     }
 
+    private StorageProperties mockStorageProperties() {
+        StorageProperties storageProperties = 
Mockito.mock(StorageProperties.class);
+        
Mockito.when(storageProperties.getPartitioningScheme()).thenReturn(PartitioningScheme.DYNAMIC);
+        return storageProperties;
+    }
+
     private NcLocalCounters mockLocalCounters() {
         final NcLocalCounters localCounters = 
Mockito.mock(NcLocalCounters.class);
         Mockito.when(localCounters.getMaxJobId()).thenReturn(1000L);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
new file mode 100644
index 0000000000..1d11c302cc
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ComputePartition {
+    private final String nodeId;
+    private final int id;
+
+    public ComputePartition(int id, String nodeId) {
+        this.id = id;
+        this.nodeId = nodeId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public int getId() {
+        return id;
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
new file mode 100644
index 0000000000..b58c39f66f
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitComputeLocations {
+
+    private final IFileSplitProvider splitsProvider;
+    private final AlgebricksPartitionConstraint constraints;
+
+    public SplitComputeLocations(IFileSplitProvider splitsProvider, 
AlgebricksPartitionConstraint constraints) {
+        this.splitsProvider = splitsProvider;
+        this.constraints = constraints;
+    }
+
+    public IFileSplitProvider getSplitsProvider() {
+        return splitsProvider;
+    }
+
+    public AlgebricksPartitionConstraint getConstraints() {
+        return constraints;
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
new file mode 100644
index 0000000000..874371e09a
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.utils.StorageConstants;
+
+public class StorageComputePartitionsMap {
+
+    private final Map<Integer, ComputePartition> stoToComputeLocation = new 
HashMap<>();
+
+    public void addStoragePartition(int stoPart, ComputePartition compute) {
+        stoToComputeLocation.put(stoPart, compute);
+    }
+
+    public int[][] getComputeToStorageMap(boolean metadataDataset) {
+        Map<Integer, List<Integer>> computeToStoragePartitions = new 
HashMap<>();
+        if (metadataDataset) {
+            final int computePartitionIdForMetadata = 0;
+            computeToStoragePartitions.put(computePartitionIdForMetadata,
+                    Collections.singletonList(computePartitionIdForMetadata));
+        } else {
+            for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
+                ComputePartition computePartition = getComputePartition(i);
+                int computeId = computePartition.getId();
+                List<Integer> storagePartitions =
+                        computeToStoragePartitions.computeIfAbsent(computeId, 
k -> new ArrayList<>());
+                storagePartitions.add(i);
+            }
+        }
+        int[][] computerToStoArray = new 
int[computeToStoragePartitions.size()][];
+        for (Map.Entry<Integer, List<Integer>> integerListEntry : 
computeToStoragePartitions.entrySet()) {
+            computerToStoArray[integerListEntry.getKey()] =
+                    integerListEntry.getValue().stream().mapToInt(i -> 
i).toArray();
+        }
+        return computerToStoArray;
+    }
+
+    public ComputePartition getComputePartition(int storagePartition) {
+        return stoToComputeLocation.get(storagePartition);
+    }
+
+    public static StorageComputePartitionsMap 
computePartitionsMap(IClusterStateManager clusterStateManager) {
+        ClusterPartition metadataPartition = 
clusterStateManager.getMetadataPartition();
+        Map<Integer, ClusterPartition> clusterPartitions = 
clusterStateManager.getClusterPartitions();
+        StorageComputePartitionsMap newMap = new StorageComputePartitionsMap();
+        newMap.addStoragePartition(metadataPartition.getPartitionId(),
+                new ComputePartition(metadataPartition.getPartitionId(), 
metadataPartition.getActiveNodeId()));
+        int storagePartitionsPerComputePartition = 
StorageConstants.NUM_STORAGE_PARTITIONS / clusterPartitions.size();
+        int storagePartitionId = 0;
+        int lastComputePartition = 1;
+        int remainingStoragePartition = 
StorageConstants.NUM_STORAGE_PARTITIONS % clusterPartitions.size();
+        for (Map.Entry<Integer, ClusterPartition> cp : 
clusterPartitions.entrySet()) {
+            ClusterPartition clusterPartition = cp.getValue();
+            for (int i = 0; i < storagePartitionsPerComputePartition; i++) {
+                newMap.addStoragePartition(storagePartitionId,
+                        new 
ComputePartition(clusterPartition.getPartitionId(), 
clusterPartition.getActiveNodeId()));
+                storagePartitionId++;
+            }
+            if (lastComputePartition == clusterPartitions.size() && 
remainingStoragePartition != 0) {
+                // assign all remaining partitions to last compute partition
+                for (int k = 0; k < remainingStoragePartition; k++) {
+                    newMap.addStoragePartition(storagePartitionId, new 
ComputePartition(
+                            clusterPartition.getPartitionId(), 
clusterPartition.getActiveNodeId()));
+                    storagePartitionId++;
+                }
+            }
+            lastComputePartition++;
+        }
+        return newMap;
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 2d231d3fb8..c26fe76156 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -46,6 +46,8 @@ public class StorageConstants {
     public static final String DEFAULT_COMPACTION_POLICY_NAME = 
ConcurrentMergePolicyFactory.NAME;
     public static final String DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME 
= "correlated-prefix";
     public static final Map<String, String> 
DEFAULT_COMPACTION_POLICY_PROPERTIES;
+    public static final int METADATA_PARTITION = -1;
+    public static final int NUM_STORAGE_PARTITIONS = 8;
 
     /**
      * The storage version of AsterixDB related artifacts (e.g. log files, 
checkpoint files, etc..).
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 9ca87b873f..3ac8a0292e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -145,6 +145,7 @@ public class GenericAdapterFactory implements 
ITypedAdapterFactory {
             throws HyracksDataException, AlgebricksException {
         this.isFeed = ExternalDataUtils.isFeed(configuration);
         if (isFeed) {
+            //TODO(partitioning) make this code reuse DataPartitioningProvider
             feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx, 
ExternalDataUtils.getDatasetDataverse(configuration),
                     ExternalDataUtils.getFeedName(configuration), 
dataSourceFactory.getPartitionConstraint());
         }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index a8df92aa2e..0b011d0c0d 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -933,7 +933,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
     public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, 
Dataset dataset, String indexName)
             throws AlgebricksException {
-        return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, 
mdTxnCtx, appCtx.getClusterStateManager());
+        return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, 
dataset, indexName).getSpiltsProvider()
+                .getFileSplits();
     }
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, 
DataverseName dataverseName,
@@ -1788,8 +1789,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public PartitioningProperties getPartitioningProperties(Dataset ds, String 
indexName) throws AlgebricksException {
-        //TODO(partitioning) pass splits rather than mdTxnCtx?
-        //        FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
         return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, 
ds, indexName);
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index ec4c985369..7257958f1d 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -18,9 +18,6 @@
  */
 package org.apache.asterix.metadata.utils;
 
-import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
-import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
-
 import java.util.Arrays;
 import java.util.Set;
 import java.util.TreeSet;
@@ -31,78 +28,71 @@ import 
org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 
-public class DataPartitioningProvider implements IDataPartitioningProvider {
+public abstract class DataPartitioningProvider implements 
IDataPartitioningProvider {
 
-    private final ICcApplicationContext appCtx;
-    private final PartitioningScheme scheme;
+    protected final ICcApplicationContext appCtx;
+    protected final ClusterStateManager clusterStateManager;
 
-    public DataPartitioningProvider(ICcApplicationContext appCtx) {
+    DataPartitioningProvider(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
-        scheme = appCtx.getStorageProperties().getPartitioningScheme();
+        this.clusterStateManager = (ClusterStateManager) 
appCtx.getClusterStateManager();
     }
 
-    public PartitioningProperties getPartitioningProperties(DataverseName 
dataverseName) {
-        if (scheme == DYNAMIC) {
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints = SplitsAndConstraintsUtil
-                    
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), 
dataverseName);
-            int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
-            return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
-        } else if (scheme == STATIC) {
-            throw new NotImplementedException();
+    public static DataPartitioningProvider create(ICcApplicationContext 
appCtx) {
+        PartitioningScheme partitioningScheme = 
appCtx.getStorageProperties().getPartitioningScheme();
+        switch (partitioningScheme) {
+            case DYNAMIC:
+                return new DynamicDataPartitioningProvider(appCtx);
+            case STATIC:
+                return new StaticDataPartitioningProvider(appCtx);
+            default:
+                throw new IllegalStateException("unknown partitioning scheme: 
" + partitioningScheme);
         }
-        throw new IllegalStateException();
     }
 
-    public PartitioningProperties 
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
-            String indexName) throws AlgebricksException {
-        if (scheme == DYNAMIC) {
-            FileSplit[] splits =
-                    SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, 
mdTxnCtx, appCtx.getClusterStateManager());
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints =
-                    
StoragePathUtil.splitProviderAndPartitionConstraints(splits);
-            int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
-            return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
-        } else if (scheme == STATIC) {
-            throw new NotImplementedException();
-        }
-        throw new IllegalStateException();
-    }
+    public abstract PartitioningProperties 
getPartitioningProperties(DataverseName dataverseName);
+
+    public abstract PartitioningProperties 
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+            String indexName) throws AlgebricksException;
 
     public PartitioningProperties getPartitioningProperties(Feed feed) throws 
AsterixException {
-        if (scheme == DYNAMIC) {
-            IClusterStateManager csm = appCtx.getClusterStateManager();
-            AlgebricksAbsolutePartitionConstraint allCluster = 
csm.getClusterLocations();
-            Set<String> nodes = new 
TreeSet<>(Arrays.asList(allCluster.getLocations()));
-            AlgebricksAbsolutePartitionConstraint locations =
-                    new 
AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
-            FileSplit[] feedLogFileSplits =
-                    FeedUtils.splitsForAdapter(appCtx, 
feed.getDataverseName(), feed.getFeedName(), locations);
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
-                    
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
-            int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(spC.second));
-            return PartitioningProperties.of(spC.first, spC.second, 
partitionsMap);
-        } else if (scheme == STATIC) {
-            throw new NotImplementedException();
-        }
-        throw new IllegalStateException();
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        AlgebricksAbsolutePartitionConstraint allCluster = 
csm.getClusterLocations();
+        Set<String> nodes = new 
TreeSet<>(Arrays.asList(allCluster.getLocations()));
+        AlgebricksAbsolutePartitionConstraint locations =
+                new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new 
String[0]));
+        FileSplit[] feedLogFileSplits =
+                FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), 
feed.getFeedName(), locations);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
+                
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+        int[][] partitionsMap = 
getOneToOnePartitionsMap(getLocationsCount(spC.second));
+        return PartitioningProperties.of(spC.first, spC.second, partitionsMap);
+    }
+
+    protected static int getNumberOfPartitions(Dataset ds) {
+        return 
MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId())
+                ? MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS
+                : StorageConstants.NUM_STORAGE_PARTITIONS;
     }
 
-    private static int getNumPartitions(AlgebricksPartitionConstraint 
constraint) {
+    protected static int getLocationsCount(AlgebricksPartitionConstraint 
constraint) {
         if (constraint.getPartitionConstraintType() == 
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
             return ((AlgebricksCountPartitionConstraint) 
constraint).getCount();
         } else {
@@ -110,7 +100,7 @@ public class DataPartitioningProvider implements 
IDataPartitioningProvider {
         }
     }
 
-    private static int[][] getPartitionsMap(int numPartitions) {
+    protected static int[][] getOneToOnePartitionsMap(int numPartitions) {
         int[][] map = new int[numPartitions][1];
         for (int i = 0; i < numPartitions; i++) {
             map[i] = new int[] { i };
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
new file mode 100644
index 0000000000..95dae4a267
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DynamicDataPartitioningProvider extends DataPartitioningProvider {
+
+    public DynamicDataPartitioningProvider(ICcApplicationContext appCtx) {
+        super(appCtx);
+    }
+
+    @Override
+    public PartitioningProperties getPartitioningProperties(DataverseName 
dataverseName) {
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints = SplitsAndConstraintsUtil
+                
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), 
dataverseName);
+        int[][] partitionsMap = 
getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
+        return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
+    }
+
+    @Override
+    public PartitioningProperties 
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+            String indexName) throws AlgebricksException {
+        FileSplit[] splits =
+                SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, 
mdTxnCtx, appCtx.getClusterStateManager());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints =
+                StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+        int[][] partitionsMap = 
getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
+        return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
new file mode 100644
index 0000000000..eaafc6ce57
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.ComputePartition;
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.cluster.SplitComputeLocations;
+import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.MappedFileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class StaticDataPartitioningProvider extends DataPartitioningProvider {
+
+    public StaticDataPartitioningProvider(ICcApplicationContext appCtx) {
+        super(appCtx);
+    }
+
+    @Override
+    public PartitioningProperties getPartitioningProperties(DataverseName 
dataverseName) {
+        SplitComputeLocations dataverseSplits = 
getDataverseSplits(dataverseName);
+        StorageComputePartitionsMap partitionMap = 
clusterStateManager.getStorageComputeMap();
+        int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
+        return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), 
dataverseSplits.getConstraints(),
+                partitionsMap);
+    }
+
+    @Override
+    public PartitioningProperties 
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+            String indexName) throws AlgebricksException {
+        SplitComputeLocations dataverseSplits = getDatasetSplits(ds, 
indexName);
+        StorageComputePartitionsMap partitionMap = 
clusterStateManager.getStorageComputeMap();
+        int[][] partitionsMap = partitionMap
+                
.getComputeToStorageMap(MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId()));
+        return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), 
dataverseSplits.getConstraints(),
+                partitionsMap);
+    }
+
+    private SplitComputeLocations getDataverseSplits(DataverseName 
dataverseName) {
+        List<FileSplit> splits = new ArrayList<>();
+        List<String> locations = new ArrayList<>();
+        Set<Integer> uniqueLocations = new HashSet<>();
+        StorageComputePartitionsMap partitionMap = 
clusterStateManager.getStorageComputeMap();
+        for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
+            File f = new File(StoragePathUtil.prepareStoragePartitionPath(i),
+                    StoragePathUtil.prepareDataverseName(dataverseName));
+            ComputePartition computePartition = 
partitionMap.getComputePartition(i);
+            splits.add(new MappedFileSplit(computePartition.getNodeId(), 
f.getPath(), 0));
+            if (!uniqueLocations.contains(computePartition.getId())) {
+                locations.add(computePartition.getNodeId());
+            }
+            uniqueLocations.add(computePartition.getId());
+        }
+        IFileSplitProvider splitProvider = 
StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0]));
+        AlgebricksPartitionConstraint constraints =
+                new 
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+        return new SplitComputeLocations(splitProvider, constraints);
+    }
+
+    private SplitComputeLocations getDatasetSplits(Dataset dataset, String 
indexName) {
+        List<FileSplit> splits = new ArrayList<>();
+        List<String> locations = new ArrayList<>();
+        Set<Integer> uniqueLocations = new HashSet<>();
+        StorageComputePartitionsMap partitionMap = 
clusterStateManager.getStorageComputeMap();
+        final int datasetPartitons = getNumberOfPartitions(dataset);
+        boolean metadataDataset = 
MetadataIndexImmutableProperties.isMetadataDataset(dataset.getDatasetId());
+        for (int i = 0; i < datasetPartitons; i++) {
+            int storagePartition = metadataDataset ? 
StorageConstants.METADATA_PARTITION : i;
+            final String relPath = 
StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName, 
dataset.getRebalanceCount());
+            File f = new 
File(StoragePathUtil.prepareStoragePartitionPath(storagePartition), relPath);
+            ComputePartition computePartition = 
partitionMap.getComputePartition(storagePartition);
+            splits.add(new MappedFileSplit(computePartition.getNodeId(), 
f.getPath(), 0));
+            if (!uniqueLocations.contains(computePartition.getId())) {
+                locations.add(computePartition.getNodeId());
+            }
+            uniqueLocations.add(computePartition.getId());
+        }
+        IFileSplitProvider splitProvider = 
StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0]));
+        AlgebricksPartitionConstraint constraints =
+                new 
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+        return new SplitComputeLocations(splitProvider, constraints);
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 7e51ec16db..3762e82991 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -65,6 +65,8 @@ import org.apache.hyracks.storage.common.MultiComparator;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 
@@ -82,11 +84,10 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
 
     private final IFrameOperationCallback[] frameOpCallbacks;
     private boolean flushedPartialTuples;
-    private int currentTupleIdx;
-    private int lastFlushedTupleIdx;
-
     private final PermutingFrameTupleReference keyTuple;
     private final Int2ObjectMap<IntSet> partition2TuplesMap = new 
Int2ObjectOpenHashMap<>();
+    private final IntSet processedTuples = new IntOpenHashSet();
+    private final IntSet flushedTuples = new IntOpenHashSet();
     private final SourceLocation sourceLoc;
 
     public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
@@ -142,7 +143,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 @Override
                 public void process(FrameTupleAccessor accessor, 
ITupleReference tuple, int index)
                         throws HyracksDataException {
-                    if (index < currentTupleIdx) {
+                    if (processedTuples.contains(index)) {
                         // already processed; skip
                         return;
                     }
@@ -174,7 +175,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                         throw 
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
                                 
HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
                     }
-                    currentTupleIdx = index + 1;
+                    processedTuples.add(index);
                 }
 
                 @Override
@@ -197,15 +198,14 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
 
     @Override
     public void open() throws HyracksDataException {
-        currentTupleIdx = 0;
-        lastFlushedTupleIdx = 0;
         flushedPartialTuples = false;
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
         try {
             INcApplicationContext appCtx =
                     (INcApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
-
+            writer.open();
+            writerOpen = true;
             for (int i = 0; i < partitions.length; i++) {
                 IIndexDataflowHelper indexHelper = indexHelpers[i];
                 indexHelper.open();
@@ -224,8 +224,6 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                             new 
PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
                     TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, 
callback, ctx);
                 }
-                writer.open();
-                writerOpen = true;
                 modCallbacks[i] =
                         
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
 ctx, this);
                 searchCallbacks[i] = (LockThenSearchOperationCallback) 
searchCallbackFactory
@@ -283,9 +281,9 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
             FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
             FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
         }
-        currentTupleIdx = 0;
-        lastFlushedTupleIdx = 0;
         flushedPartialTuples = false;
+        processedTuples.clear();
+        flushedTuples.clear();
     }
 
     /**
@@ -293,15 +291,17 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
      */
     @Override
     public void flushPartialFrame() throws HyracksDataException {
-        if (lastFlushedTupleIdx == currentTupleIdx) {
-            //nothing to flush
-            return;
-        }
-        for (int i = lastFlushedTupleIdx; i < currentTupleIdx; i++) {
-            FrameUtils.appendToWriter(writer, appender, accessor, i);
+        IntList tuplesToFlush = new IntArrayList();
+        processedTuples.iterator().forEachRemaining(tIdx -> {
+            if (!flushedTuples.contains(tIdx)) {
+                tuplesToFlush.add(tIdx);
+                flushedTuples.add(tIdx);
+            }
+        });
+        for (int i = 0; i < tuplesToFlush.size(); i++) {
+            FrameUtils.appendToWriter(writer, appender, accessor, 
tuplesToFlush.getInt(i));
         }
         appender.write(writer, true);
-        lastFlushedTupleIdx = currentTupleIdx;
         flushedPartialTuples = true;
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index d3c87ff341..984b3ce22e 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -34,12 +34,15 @@ import java.util.function.Predicate;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StorageConstants;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.config.IOption;
@@ -79,6 +82,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
     private ICcApplicationContext appCtx;
     private ClusterPartition metadataPartition;
     private boolean rebalanceRequired;
+    private StorageComputePartitionsMap storageComputePartitionsMap;
 
     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -86,7 +90,14 @@ public class ClusterStateManager implements 
IClusterStateManager {
         node2PartitionsMap = 
appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = 
appCtx.getMetadataProperties().getClusterPartitions();
         currentMetadataNode = 
appCtx.getMetadataProperties().getMetadataNodeName();
-        metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
+        PartitioningScheme partitioningScheme = 
appCtx.getStorageProperties().getPartitioningScheme();
+        if (partitioningScheme == PartitioningScheme.DYNAMIC) {
+            metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
+        } else {
+            final ClusterPartition fixedMetadataPartition = new 
ClusterPartition(StorageConstants.METADATA_PARTITION,
+                    appCtx.getMetadataProperties().getMetadataNodeName(), 0);
+            metadataPartition = fixedMetadataPartition;
+        }
         lifecycleCoordinator = appCtx.getNcLifecycleCoordinator();
         lifecycleCoordinator.bindTo(this);
     }
@@ -299,6 +310,9 @@ public class ClusterStateManager implements 
IClusterStateManager {
         clusterActiveLocations.removeAll(pendingRemoval);
         clusterPartitionConstraint =
                 new 
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new 
String[] {}));
+        if (appCtx.getStorageProperties().getPartitioningScheme() == 
PartitioningScheme.STATIC) {
+            storageComputePartitionsMap = 
StorageComputePartitionsMap.computePartitionsMap(this);
+        }
     }
 
     @Override
@@ -489,6 +503,10 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return nodeIds.stream().anyMatch(failedNodes::contains);
     }
 
+    public synchronized StorageComputePartitionsMap getStorageComputeMap() {
+        return storageComputePartitionsMap;
+    }
+
     private void updateClusterCounters(String nodeId, NcLocalCounters 
localCounters) {
         final IResourceIdManager resourceIdManager = 
appCtx.getResourceIdManager();
         resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index bb3cde5944..1e813462b3 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -211,7 +211,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     public void delete(String relativePath) throws HyracksDataException {
         FileReference resourceFile = getLocalResourceFileByName(ioManager, 
relativePath);
         boolean resourceExists = resourceFile.getFile().exists();
-        if (resourceExists) {
+        if (isReplicationEnabled && resourceExists) {
             try {
                 createReplicationJob(ReplicationOperation.DELETE, 
resourceFile);
             } catch (Exception e) {


Reply via email to