Repository: asterixdb Updated Branches: refs/heads/master 08dc8597e -> 0a5b641a9
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-server/src/test/resources/integrationts/replication/results/failover/resync_failed_replica/resync_failed_replica.8.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/replication/results/failover/resync_failed_replica/resync_failed_replica.8.adm b/asterixdb/asterix-server/src/test/resources/integrationts/replication/results/failover/resync_failed_replica/resync_failed_replica.8.adm new file mode 100644 index 0000000..c0697b7 --- /dev/null +++ b/asterixdb/asterix-server/src/test/resources/integrationts/replication/results/failover/resync_failed_replica/resync_failed_replica.8.adm @@ -0,0 +1,38 @@ +{ + "metadata_node" : "asterix_nc1", + "partitions" : { + "0" : { + "active" : true, + "activeNodeId" : "asterix_nc1", + "iodeviceNum" : 0, + "nodeId" : "asterix_nc1", + "partitionId" : 0, + "pendingActivation" : false + }, + "1" : { + "active" : true, + "activeNodeId" : "asterix_nc1", + "iodeviceNum" : 1, + "nodeId" : "asterix_nc1", + "partitionId" : 1, + "pendingActivation" : false + }, + "2" : { + "active" : true, + "activeNodeId" : "asterix_nc2", + "iodeviceNum" : 0, + "nodeId" : "asterix_nc2", + "partitionId" : 2, + "pendingActivation" : false + }, + "3" : { + "active" : true, + "activeNodeId" : "asterix_nc2", + "iodeviceNum" : 1, + "nodeId" : "asterix_nc2", + "partitionId" : 3, + "pendingActivation" : false + } + }, + "state" : "ACTIVE" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml b/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml index 0f6b528..df5dbac 100644 --- a/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml +++ b/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml @@ -19,25 +19,8 @@ <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql"> <test-group name="failover"> <test-case FilePath="failover"> - <compilation-unit name="bulkload"> - <output-dir compare="Text">bulkload</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="failover"> - <compilation-unit name="mem_component_recovery"> - <output-dir compare="Text">mem_component_recovery</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="failover"> - <compilation-unit name="metadata_node"> - <output-dir compare="Text">metadata_node</output-dir> - </compilation-unit> - </test-case> - </test-group> - <test-group name="failback"> - <test-case FilePath="failback"> - <compilation-unit name="node_failback"> - <output-dir compare="Text">node_failback</output-dir> + <compilation-unit name="resync_failed_replica"> + <output-dir compare="Text">resync_failed_replica</output-dir> </compilation-unit> </test-case> </test-group> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 601dec3..54d6268 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -18,37 +18,37 @@ */ package org.apache.asterix.transaction.management.resource; +import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX; import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FilenameFilter; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.ReplicationJob; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; -import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.commons.io.FileUtils; @@ -71,7 +71,8 @@ import com.google.common.cache.CacheBuilder; public class PersistentLocalResourceRepository implements ILocalResourceRepository { public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME); - // Private constants + private static final FilenameFilter LSM_INDEX_FILES_FILTER = + (dir, name) -> !name.startsWith(INDEX_CHECKPOINT_FILE_PREFIX); private static final int MAX_CACHED_RESOURCES = 1000; private static final IOFileFilter METADATA_FILES_FILTER = new IOFileFilter() { @Override @@ -100,17 +101,14 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito // Finals private final IIOManager ioManager; private final Cache<String, LocalResource> resourceCache; - private final Set<Integer> nodeOriginalPartitions; - private final Set<Integer> nodeActivePartitions; // Mutables private boolean isReplicationEnabled = false; private Set<String> filesToBeReplicated; private IReplicationManager replicationManager; - private Set<Integer> nodeInactivePartitions; private final Path[] storageRoots; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; - public PersistentLocalResourceRepository(IIOManager ioManager, String nodeId, MetadataProperties metadataProperties, + public PersistentLocalResourceRepository(IIOManager ioManager, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { this.ioManager = ioManager; this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; @@ -122,15 +120,6 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } createStorageRoots(); resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build(); - ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId); - //initially the node active partitions are the same as the original partitions - nodeOriginalPartitions = new HashSet<>(nodePartitions.length); - nodeActivePartitions = new HashSet<>(nodePartitions.length); - nodeInactivePartitions = new HashSet<>(nodePartitions.length); - for (ClusterPartition partition : nodePartitions) { - nodeOriginalPartitions.add(partition.getPartitionId()); - nodeActivePartitions.add(partition.getPartitionId()); - } } @Override @@ -267,7 +256,6 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito if (isReplicationEnabled) { filesToBeReplicated = new HashSet<>(); - nodeInactivePartitions = ConcurrentHashMap.newKeySet(); } } @@ -299,26 +287,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito createStorageRoots(); } - public Set<Integer> getActivePartitions() { - return Collections.unmodifiableSet(nodeActivePartitions); - } - - public Set<Integer> getInactivePartitions() { - return Collections.unmodifiableSet(nodeInactivePartitions); - } - - public synchronized void addActivePartition(int partitonId) { - nodeActivePartitions.add(partitonId); - nodeInactivePartitions.remove(partitonId); - } - - public synchronized void addInactivePartition(int partitonId) { - nodeInactivePartitions.add(partitonId); - nodeActivePartitions.remove(partitonId); + public Set<Integer> getAllPartitions() throws HyracksDataException { + return loadAndGetAllResources().values().stream().map(LocalResource::getResource) + .map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition) + .collect(Collectors.toSet()); } public DatasetResourceReference getLocalResourceReference(String absoluteFilePath) throws HyracksDataException { - //TODO pass relative path final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath); final LocalResource lr = get(localResourcePath); return DatasetResourceReference.of(lr); @@ -351,17 +326,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito }); } - /** - * Given any index file, an absolute {@link FileReference} is returned which points to where the index of - * {@code indexFile} is located. - * - * @param indexFile - * @return - * @throws HyracksDataException - */ - public FileReference getIndexPath(Path indexFile) throws HyracksDataException { - final ResourceReference ref = ResourceReference.of(indexFile.toString()); - return ioManager.resolve(ref.getRelativePath().toString()); + public List<String> getPartitionIndexesFiles(int partition) throws HyracksDataException { + List<String> partitionFiles = new ArrayList<>(); + Set<File> partitionIndexes = getPartitionIndexes(partition); + for (File indexDir : partitionIndexes) { + if (indexDir.isDirectory()) { + File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); + if (indexFiles != null) { + Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add); + } + } + } + return partitionFiles; } private void createStorageRoots() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java index 33c6260..d15e6ff 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java @@ -18,30 +18,23 @@ */ package org.apache.asterix.transaction.management.resource; -import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory; public class PersistentLocalResourceRepositoryFactory implements ILocalResourceRepositoryFactory { private final IIOManager ioManager; - private final String nodeId; - private final MetadataProperties metadataProperties; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; - public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId, - MetadataProperties metadataProperties, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { + public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, + IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { this.ioManager = ioManager; - this.nodeId = nodeId; - this.metadataProperties = metadataProperties; this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; } @Override - public ILocalResourceRepository createRepository() throws HyracksDataException { - return new PersistentLocalResourceRepository(ioManager, nodeId, metadataProperties, - indexCheckpointManagerProvider); + public ILocalResourceRepository createRepository() { + return new PersistentLocalResourceRepository(ioManager, indexCheckpointManagerProvider); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index b67da80..011d2a1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -26,10 +26,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.replication.IReplicationThread; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ILogBuffer; import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.asterix.common.transactions.ILogRequester; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogRecord; @@ -318,10 +318,9 @@ public class LogBuffer implements ILogBuffer { } } logRecord.isFlushed(true); - IReplicationThread replicationThread = logRecord.getReplicationThread(); - - if (replicationThread != null) { - replicationThread.notifyLogReplicationRequester(logRecord); + final ILogRequester logRequester = logRecord.getRequester(); + if (logRequester != null) { + logRequester.notifyFlushed(logRecord); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index 9b6a4f9..53cd038 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -61,7 +61,7 @@ public class LogManagerWithReplication extends LogManager { shouldReplicate = false; } } - logRecord.setReplicated(shouldReplicate); + logRecord.setReplicate(shouldReplicate); //Remote flush logs do not need to be flushed separately since they may not trigger local flush if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) { @@ -76,9 +76,9 @@ public class LogManagerWithReplication extends LogManager { protected void appendToLogTail(ILogRecord logRecord) { syncAppendToLogTail(logRecord); - if (logRecord.isReplicated()) { + if (logRecord.isReplicate()) { try { - replicationManager.replicateLog(logRecord); + replicationManager.replicate(logRecord); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ACIDException(e); @@ -92,9 +92,9 @@ public class LogManagerWithReplication extends LogManager { logRecord.wait(); } //wait for job Commit/Abort ACK from replicas - if (logRecord.isReplicated() && (logRecord.getLogType() == LogType.JOB_COMMIT + if (logRecord.isReplicate() && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { - while (!replicationManager.hasBeenReplicated(logRecord)) { + while (!logRecord.isReplicated()) { logRecord.wait(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java index 68c5ce1..ca7d37a 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java @@ -29,11 +29,7 @@ public class CheckpointManagerFactory { } public static ICheckpointManager create(ITransactionSubsystem txnSubsystem, - CheckpointProperties checkpointProperties, boolean replicationEnabled) { - if (!replicationEnabled) { - return new CheckpointManager(txnSubsystem, checkpointProperties); - } else { - return new ReplicationCheckpointManager(txnSubsystem, checkpointProperties); - } + CheckpointProperties checkpointProperties) { + return new CheckpointManager(txnSubsystem, checkpointProperties); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java deleted file mode 100644 index 4bbcabe..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.recovery; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -import org.apache.asterix.common.api.IApplicationContext; -import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.MetadataProperties; -import org.apache.asterix.common.replication.IReplicaResourcesManager; -import org.apache.asterix.common.replication.IReplicationManager; -import org.apache.asterix.common.transactions.CheckpointProperties; -import org.apache.asterix.common.transactions.ICheckpointManager; -import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * An implementation of {@link ICheckpointManager} that defines the logic - * of checkpoints when replication is enabled.. - */ -public class ReplicationCheckpointManager extends AbstractCheckpointManager { - - private static final Logger LOGGER = LogManager.getLogger(); - - public ReplicationCheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { - super(txnSubsystem, checkpointProperties); - } - - /** - * Performs a sharp checkpoint. All datasets are flushed and all transaction - * log files are deleted except the files that are needed for dead replicas. - */ - @Override - public synchronized void doSharpCheckpoint() throws HyracksDataException { - LOGGER.info("Starting sharp checkpoint..."); - final IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); - datasetLifecycleManager.flushAllDatasets(); - long minFirstLSN; - // If shutting down, need to check if we need to keep any remote logs for dead replicas - if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown()) { - final Set<String> deadReplicaIds = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext() - .getReplicationManager().getDeadReplicasIds(); - if (deadReplicaIds.isEmpty()) { - // No dead replicas => no need to keep any log - minFirstLSN = SHARP_CHECKPOINT_LSN; - } else { - // Get min LSN of dead replicas remote resources - minFirstLSN = getDeadReplicasMinFirstLSN(deadReplicaIds); - } - } else { - // Start up complete checkpoint. Avoid deleting remote recovery logs. - minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); - } - capture(minFirstLSN, true); - if (minFirstLSN == SHARP_CHECKPOINT_LSN) { - // No need to keep any logs - txnSubsystem.getLogManager().renewLogFiles(); - } else { - // Delete only log files with LSNs < any dead replica partition minimum LSN - txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN); - } - LOGGER.info("Completed sharp checkpoint."); - } - - /*** - * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}. - * If a checkpoint cannot be captured due to datasets having LSN < {@code checkpointTargetLSN}, - * an asynchronous flush is triggered on them. If the checkpoint fails due to a replica index, - * a request is sent to the primary replica of the index to flush it. - * When a checkpoint is successful, all transaction log files that end with - * LSN < {@code checkpointTargetLSN} are deleted. - */ - @Override - public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException { - LOGGER.info("Attemping soft checkpoint..."); - final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); - boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN; - if (!checkpointSucceeded) { - // Flush datasets with indexes behind target checkpoint LSN - final IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); - datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN); - // Request remote replicas to flush lagging indexes - final IReplicationManager replicationManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager(); - try { - replicationManager.requestFlushLaggingReplicaIndexes(checkpointTargetLSN); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } - capture(minFirstLSN, false); - if (checkpointSucceeded) { - txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN); - LOGGER.info(String.format("soft checkpoint succeeded with at LSN(%s)", minFirstLSN)); - } - return minFirstLSN; - } - - private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) throws HyracksDataException { - final IReplicaResourcesManager remoteResourcesManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager(); - final IApplicationContext propertiesProvider = - txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext(); - final MetadataProperties metadataProperties = propertiesProvider.getMetadataProperties(); - final PersistentLocalResourceRepository localResourceRepository = - (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider() - .getLocalResourceRepository(); - // Get partitions of the dead replicas that are not active on this node - final Set<Integer> deadReplicasPartitions = new HashSet<>(); - for (String deadReplicaId : deadReplicaIds) { - final ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions().get(deadReplicaId); - for (ClusterPartition partition : nodePartitons) { - if (!localResourceRepository.getActivePartitions().contains(partition.getPartitionId())) { - deadReplicasPartitions.add(partition.getPartitionId()); - } - } - } - return remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java index cbe6d1a..33c9c96 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java @@ -45,4 +45,8 @@ public interface IReplicationJob { public Set<String> getJobFiles(); + default String getAnyFile() { + return getJobFiles().stream().findAny() + .orElseThrow(() -> new IllegalStateException("Replication job without any files")); + } }