Yingyi Bu has submitted this change and it was merged. Change subject: Remove log buffer factory ......................................................................
Remove log buffer factory Change-Id: I814dac8ae5fc49b88470ab115b17bf023494afe9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1765 Reviewed-by: Yingyi Bu <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java D asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java D asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java D asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java D asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java 9 files changed, 29 insertions(+), 207 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java index 9f86a26..1e920e7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -36,10 +36,8 @@ import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager; -import org.apache.asterix.transaction.management.service.logging.LogBufferFactory; import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication; -import org.apache.asterix.transaction.management.service.logging.ReplicatingLogBufferFactory; import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory; import org.apache.asterix.transaction.management.service.transaction.TransactionManager; import org.apache.hyracks.api.application.INCServiceContext; @@ -85,10 +83,9 @@ } if (replicationEnabled) { - this.logManager = - new LogManagerWithReplication(this, ReplicatingLogBufferFactory.INSTANCE, replicationStrategy); + this.logManager = new LogManagerWithReplication(this, replicationStrategy); } else { - this.logManager = new LogManager(this, LogBufferFactory.INSTANCE); + this.logManager = new LogManager(this); } this.recoveryManager = new RecoveryManager(this, serviceCtx); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 4b24108..1770f1b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -236,7 +236,7 @@ } private ComparisonException createLineChangedException(File scriptFile, String lineExpected, String lineActual, - int num) { + int num) { return new ComparisonException("Result for " + scriptFile + " changed at line " + num + ":\n< " + truncateIfLong(lineExpected) + "\n> " + truncateIfLong(lineActual)); } @@ -396,7 +396,7 @@ if (!pattern.matcher(actual.toString()).matches()) { // figure out where the problem first occurs... StringBuilder builder = new StringBuilder(); - String [] lines = expected.toString().split("\\n"); + String[] lines = expected.toString().split("\\n"); int endOfMatch = 0; final StringBuffer actualBuffer = actual.getBuffer(); for (int i = 0; i < lines.length; i++) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java deleted file mode 100644 index 4a88dcc..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.transactions; - -@FunctionalInterface -public interface ILogBufferFactory { - - /** - * Create a log buffer - * - * @param txnSubsystem - * the transaction subsystem - * @param logPageSize - * the default log page size - * @param flushLsn - * a mutable long used to communicate progress - * @return a in instance of ILogBuffer - */ - ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn); -} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 831bace..081cf02 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -88,24 +88,32 @@ //////////////////////////////////// @Override - public void append(ILogRecord logRecord, long appendLSN) { + public void append(ILogRecord logRecord, long appendLsn) { logRecord.writeLogRecord(appendBuffer); - if (logRecord.getLogType() != LogType.FLUSH && logRecord.getLogType() != LogType.WAIT) { - logRecord.getTxnCtx().setLastLSN(appendLSN); + + if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH + && logRecord.getLogType() != LogType.WAIT) { + logRecord.getTxnCtx().setLastLSN(appendLsn); } + synchronized (this) { appendOffset += logRecord.getLogSize(); if (IS_DEBUG_MODE) { LOGGER.info("append()| appendOffset: " + appendOffset); } - if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT - || logRecord.getLogType() == LogType.WAIT) { - logRecord.isFlushed(false); - syncCommitQ.offer(logRecord); - } - if (logRecord.getLogType() == LogType.FLUSH) { - logRecord.isFlushed(false); - flushQ.offer(logRecord); + if (logRecord.getLogSource() == LogSource.LOCAL) { + if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT + || logRecord.getLogType() == LogType.WAIT) { + logRecord.isFlushed(false); + syncCommitQ.offer(logRecord); + } + if (logRecord.getLogType() == LogType.FLUSH) { + logRecord.isFlushed(false); + flushQ.offer(logRecord); + } + } else if (logRecord.getLogSource() == LogSource.REMOTE + && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { + remoteJobsQ.offer(logRecord); } this.notify(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java deleted file mode 100644 index 27fdfd1..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.logging; - -import org.apache.asterix.common.transactions.ILogBuffer; -import org.apache.asterix.common.transactions.ILogBufferFactory; -import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.MutableLong; - -public class LogBufferFactory implements ILogBufferFactory { - public static final LogBufferFactory INSTANCE = new LogBufferFactory(); - - private LogBufferFactory() { - } - - @Override - public ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) { - return new LogBuffer(txnSubsystem, logPageSize, flushLsn); - } -} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index d3e6baf..e5e91e8 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -41,7 +41,6 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.transactions.ILogBuffer; -import org.apache.asterix.common.transactions.ILogBufferFactory; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogReader; import org.apache.asterix.common.transactions.ILogRecord; @@ -67,7 +66,6 @@ * Finals */ private final ITransactionSubsystem txnSubsystem; - private final ILogBufferFactory logBufferFactory; private final LogManagerProperties logManagerProperties; private final int numLogPages; private final String logDir; @@ -91,9 +89,8 @@ private Future<? extends Object> futureLogFlusher; protected LinkedBlockingQueue<ILogRecord> flushLogsQ; - public LogManager(ITransactionSubsystem txnSubsystem, ILogBufferFactory logBufferFactory) { + public LogManager(ITransactionSubsystem txnSubsystem) { this.txnSubsystem = txnSubsystem; - this.logBufferFactory = logBufferFactory; logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(), this.txnSubsystem.getId()); logFileSize = logManagerProperties.getLogPartitionSize(); @@ -114,7 +111,7 @@ flushQ = new LinkedBlockingQueue<>(numLogPages); stashQ = new LinkedBlockingQueue<>(numLogPages); for (int i = 0; i < numLogPages; i++) { - emptyQ.offer(logBufferFactory.create(txnSubsystem, logPageSize, flushLSN)); + emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN)); } appendLSN.set(initializeLogAnchor(nextLogFileId)); flushLSN.set(appendLSN.get()); @@ -208,7 +205,7 @@ } // for now, alloc a new buffer for each large page // TODO: pool large pages?? - appendPage = logBufferFactory.create(txnSubsystem, logSize, flushLSN); + appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN); appendPage.setFileChannel(appendChannel); flushQ.offer(appendPage); } else { @@ -639,8 +636,7 @@ class LogFlusher implements Callable<Boolean> { private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName()); - private static final ILogBuffer POISON_PILL = - LogBufferFactory.INSTANCE.create(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null); + private static final ILogBuffer POISON_PILL = new LogBuffer(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null); private final LogManager logMgr;//for debugging private final LinkedBlockingQueue<ILogBuffer> emptyQ; private final LinkedBlockingQueue<ILogBuffer> flushQ; diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index f86eea5..dd8fb6e 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -24,7 +24,6 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.common.transactions.ILogBufferFactory; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; @@ -38,9 +37,8 @@ private final IReplicationStrategy replicationStrategy; private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet(); - public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, ILogBufferFactory logBufferFactory, - IReplicationStrategy replicationStrategy) { - super(txnSubsystem, logBufferFactory); + public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, IReplicationStrategy replicationStrategy) { + super(txnSubsystem); this.replicationStrategy = replicationStrategy; } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java deleted file mode 100644 index 0eceb2e..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.logging; - -import java.util.logging.Logger; - -import org.apache.asterix.common.transactions.ILogRecord; -import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.LogSource; -import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.common.transactions.MutableLong; - -public class ReplicatingLogBuffer extends LogBuffer { - private static final Logger LOGGER = Logger.getLogger(ReplicatingLogBuffer.class.getName()); - - public ReplicatingLogBuffer(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) { - super(txnSubsystem, logPageSize, flushLsn); - } - - @Override - public void append(ILogRecord logRecord, long appendLsn) { - logRecord.writeLogRecord(appendBuffer); - - if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH - && logRecord.getLogType() != LogType.WAIT) { - logRecord.getTxnCtx().setLastLSN(appendLsn); - } - - synchronized (this) { - appendOffset += logRecord.getLogSize(); - if (IS_DEBUG_MODE) { - LOGGER.info("append()| appendOffset: " + appendOffset); - } - if (logRecord.getLogSource() == LogSource.LOCAL) { - if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT - || logRecord.getLogType() == LogType.WAIT) { - logRecord.isFlushed(false); - syncCommitQ.offer(logRecord); - } - if (logRecord.getLogType() == LogType.FLUSH) { - logRecord.isFlushed(false); - flushQ.offer(logRecord); - } - } else if (logRecord.getLogSource() == LogSource.REMOTE - && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { - remoteJobsQ.offer(logRecord); - } - this.notify(); - } - } - -} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java deleted file mode 100644 index 2abc474..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.logging; - -import org.apache.asterix.common.transactions.ILogBuffer; -import org.apache.asterix.common.transactions.ILogBufferFactory; -import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.MutableLong; - -public class ReplicatingLogBufferFactory implements ILogBufferFactory { - public static final ReplicatingLogBufferFactory INSTANCE = new ReplicatingLogBufferFactory(); - - private ReplicatingLogBufferFactory() { - - } - - @Override - public ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) { - return new ReplicatingLogBuffer(txnSubsystem, logPageSize, flushLsn); - } -} -- To view, visit https://asterix-gerrit.ics.uci.edu/1765 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I814dac8ae5fc49b88470ab115b17bf023494afe9 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
