[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r353572944 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionMetadataStoreStateException.java ## @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.exceptions; + +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState.State; +import org.apache.pulsar.transaction.impl.common.TxnID; + +/** + * Exception is thrown when a operation of transaction is executed in a error transaction metadata store state. + */ +public class TransactionMetadataStoreStateException extends CoordinatorException { Review comment: It is TransactionStateException not MetadataStoreStateException, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r353572089 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java ## @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.transaction.impl.common.TxnID; + +/** + * A log interface for transaction to read and write transaction operation. + */ +public interface TransactionLog { + + +/** + * Replay transaction log to load the transaction map. + * + * @param txnMetaMap the map of transaction metadata store to load + * @param sequenceId the sequence id for transaction metadata store to create new transaction + * @param replayCallback the call back for replaying the transaction log + */ +void replayAsync(ConcurrentMap txnMetaMap, +AtomicLong sequenceId, ReplayCallback replayCallback); + +/** + * Read the entry from bookkeeper. + * + * @param numberOfEntriesToRead the number of reading entry + * @param callback the callback to executing when reading entry async finished + */ +void readAsync(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx); Review comment: Both write read and replay should use TransactionMetadataEntry not Entry or some other types. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350019792 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java ## @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +import java.util.concurrent.CompletableFuture; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.pulsar.common.api.proto.PulsarApi; + +/** + * A log interface for transaction to read and write transaction operation. + */ +public interface TransactionLog { + +/** + * Read the entry from bookkeeper. + * + * @param numberOfEntriesToRead the number of reading entry + * @param callback the callback to executing when reading entry async finished + */ +void read(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx); Review comment: It's better to call back the TransactionMetadataEntry straightforward, because this is a TransactionLog implementation, all of reads and writes need to facing TransactionMetadataEntry This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350041641 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore +extends TransactionMetadataStoreState implements TransactionMetadataStore { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); +private final ManagedLedgerTransactionLogImpl transactionLog; +private static final long TC_ID_NOT_USED = -1L; +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); +private SpscArrayQueue entryQueue; +private volatile long loadCount; + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +super(State.None); +this.tcID = tcID; +this.transactionLog = +new ManagedLedgerTransactionLogImpl(tcID.getId(), managedLedgerFactory); +this.entryQueue = new SpscArrayQueue<>(2000); +init(); +} + +private ConcurrentMap getTxnMetaMap() { +return txnMetaMap; +} + +public AtomicLong getSequenceId() { +return sequenceId; +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID).status()); +} + +@Override +public CompletableFuture getTxnMetaAsync(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID)); +} + +@Override +public CompletableFuture newTransactionAsync() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350035812 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore +extends TransactionMetadataStoreState implements TransactionMetadataStore { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); +private final ManagedLedgerTransactionLogImpl transactionLog; +private static final long TC_ID_NOT_USED = -1L; +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); +private SpscArrayQueue entryQueue; +private volatile long loadCount; + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +super(State.None); +this.tcID = tcID; +this.transactionLog = +new ManagedLedgerTransactionLogImpl(tcID.getId(), managedLedgerFactory); +this.entryQueue = new SpscArrayQueue<>(2000); +init(); +} + +private ConcurrentMap getTxnMetaMap() { +return txnMetaMap; +} + +public AtomicLong getSequenceId() { +return sequenceId; +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID).status()); +} + +@Override +public CompletableFuture getTxnMetaAsync(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID)); +} + +@Override +public CompletableFuture newTransactionAsync() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350040883 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore +extends TransactionMetadataStoreState implements TransactionMetadataStore { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); +private final ManagedLedgerTransactionLogImpl transactionLog; +private static final long TC_ID_NOT_USED = -1L; +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); +private SpscArrayQueue entryQueue; +private volatile long loadCount; + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +super(State.None); +this.tcID = tcID; +this.transactionLog = +new ManagedLedgerTransactionLogImpl(tcID.getId(), managedLedgerFactory); +this.entryQueue = new SpscArrayQueue<>(2000); +init(); +} + +private ConcurrentMap getTxnMetaMap() { +return txnMetaMap; +} + +public AtomicLong getSequenceId() { +return sequenceId; +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID).status()); +} + +@Override +public CompletableFuture getTxnMetaAsync(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID)); +} + +@Override +public CompletableFuture newTransactionAsync() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r349979380 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionLogImpl.java ## @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; +import org.apache.pulsar.transaction.coordinator.TransactionLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionLogImpl implements TransactionLog { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionLogImpl.class); + +private final ManagedLedger managedLedger; + +private final static String TRANSACTION_LOG_PREFIX = "transaction/log/"; + +private final ReadOnlyCursor readOnlyCursor; + +ManagedLedgerTransactionLogImpl(long tcID, +ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.managedLedger = managedLedgerFactory.open(TRANSACTION_LOG_PREFIX + tcID); +this.readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(TRANSACTION_LOG_PREFIX + tcID, +PositionImpl.earliest, new ManagedLedgerConfig()); +} + +@Override +public void read(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { Review comment: It's better to call back the TransactionMetadataEntry straightforward, because this is a TransactionLog implementation, all of reads and writes need to facing TransactionMetadataEntry This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350025462 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionLogImpl.java ## @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; +import org.apache.pulsar.transaction.coordinator.TransactionLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionLogImpl implements TransactionLog { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionLogImpl.class); + +private final ManagedLedger managedLedger; + +private final static String TRANSACTION_LOG_PREFIX = "transaction/log/"; + +private final ReadOnlyCursor readOnlyCursor; + +ManagedLedgerTransactionLogImpl(long tcID, +ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.managedLedger = managedLedgerFactory.open(TRANSACTION_LOG_PREFIX + tcID); +this.readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(TRANSACTION_LOG_PREFIX + tcID, +PositionImpl.earliest, new ManagedLedgerConfig()); +} + +@Override +public void read(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { +readOnlyCursor.asyncReadEntries(numberOfEntriesToRead, callback, System.nanoTime()); +} + +ReadOnlyCursor getReadOnlyCursor() { +return readOnlyCursor; +} + +@Override +public CompletableFuture close() { +try { +managedLedger.close(); +readOnlyCursor.close(); Review comment: You can call `managedLedger.asyncClose()` and `readOnlyCursor.asyncClose()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r349981445 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore +extends TransactionMetadataStoreState implements TransactionMetadataStore { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); +private final ManagedLedgerTransactionLogImpl transactionLog; +private static final long TC_ID_NOT_USED = -1L; +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); +private SpscArrayQueue entryQueue; +private volatile long loadCount; + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +super(State.None); +this.tcID = tcID; +this.transactionLog = +new ManagedLedgerTransactionLogImpl(tcID.getId(), managedLedgerFactory); +this.entryQueue = new SpscArrayQueue<>(2000); +init(); +} + +private ConcurrentMap getTxnMetaMap() { +return txnMetaMap; +} + +public AtomicLong getSequenceId() { +return sequenceId; +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID).status()); +} + +@Override +public CompletableFuture getTxnMetaAsync(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID)); +} + +@Override +public CompletableFuture newTransactionAsync() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350050273 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore +extends TransactionMetadataStoreState implements TransactionMetadataStore { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); +private final ManagedLedgerTransactionLogImpl transactionLog; +private static final long TC_ID_NOT_USED = -1L; +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); +private SpscArrayQueue entryQueue; +private volatile long loadCount; + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +super(State.None); +this.tcID = tcID; +this.transactionLog = +new ManagedLedgerTransactionLogImpl(tcID.getId(), managedLedgerFactory); +this.entryQueue = new SpscArrayQueue<>(2000); +init(); +} + +private ConcurrentMap getTxnMetaMap() { +return txnMetaMap; +} + +public AtomicLong getSequenceId() { +return sequenceId; +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID).status()); +} + +@Override +public CompletableFuture getTxnMetaAsync(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID)); +} + +@Override +public CompletableFuture newTransactionAsync() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350020267 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java ## @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +import java.util.concurrent.CompletableFuture; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.pulsar.common.api.proto.PulsarApi; + +/** + * A log interface for transaction to read and write transaction operation. + */ +public interface TransactionLog { + +/** + * Read the entry from bookkeeper. + * + * @param numberOfEntriesToRead the number of reading entry + * @param callback the callback to executing when reading entry async finished + */ +void read(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx); Review comment: And if the method is a async method, please all Async to the end(e.g. readAsync closeAsync writeAsync) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350036026 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore +extends TransactionMetadataStoreState implements TransactionMetadataStore { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); +private final ManagedLedgerTransactionLogImpl transactionLog; +private static final long TC_ID_NOT_USED = -1L; +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); +private SpscArrayQueue entryQueue; +private volatile long loadCount; + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +super(State.None); +this.tcID = tcID; +this.transactionLog = +new ManagedLedgerTransactionLogImpl(tcID.getId(), managedLedgerFactory); +this.entryQueue = new SpscArrayQueue<>(2000); +init(); +} + +private ConcurrentMap getTxnMetaMap() { +return txnMetaMap; +} + +public AtomicLong getSequenceId() { +return sequenceId; +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID).status()); +} + +@Override +public CompletableFuture getTxnMetaAsync(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID)); +} + +@Override +public CompletableFuture newTransactionAsync() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350023649 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java ## @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.UnaryOperator; + +/** + * The implement of transaction metadata store state. + */ +public abstract class TransactionMetadataStoreState { + +/** + * The state of the transactionMetadataStore {@link TransactionMetadataStore}. + */ +public enum State { +None, +Initializing, +Ready, +Close +} + +private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(TransactionMetadataStoreState.class, State.class, "state"); + +@SuppressWarnings("unused") +private volatile State state = null; + +public TransactionMetadataStoreState(State state) { +STATE_UPDATER.set(this, state); + +} + +protected boolean changeToReadyState() { +return (STATE_UPDATER.compareAndSet(this, State.Initializing, State.Ready)); +} + +protected boolean changeToInitializingState() { +return STATE_UPDATER.compareAndSet(this, State.None, State.Initializing); +} + +protected boolean changeToClose() { +return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close) +|| STATE_UPDATER.compareAndSet(this, State.None, State.Close) +|| STATE_UPDATER.compareAndSet(this, State.Initializing, State.Close)); +} + +protected boolean checkIfReady() { +return STATE_UPDATER.get(this) == State.Ready; +} + +public boolean checkCurrentState(State state) { +return STATE_UPDATER.get(this) == state; +} + +public State getState() { +return STATE_UPDATER.get(this); +} + +protected void setState(State s) { +STATE_UPDATER.set(this, s); +} + +protected State getAndUpdateState(final UnaryOperator updater) { +return STATE_UPDATER.getAndUpdate(this, updater); +} Review comment: is any places used this method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r349981383 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore +extends TransactionMetadataStoreState implements TransactionMetadataStore { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); +private final ManagedLedgerTransactionLogImpl transactionLog; +private static final long TC_ID_NOT_USED = -1L; +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); +private SpscArrayQueue entryQueue; +private volatile long loadCount; + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +super(State.None); +this.tcID = tcID; +this.transactionLog = +new ManagedLedgerTransactionLogImpl(tcID.getId(), managedLedgerFactory); +this.entryQueue = new SpscArrayQueue<>(2000); +init(); +} + +private ConcurrentMap getTxnMetaMap() { +return txnMetaMap; +} + +public AtomicLong getSequenceId() { +return sequenceId; +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID).status()); +} + +@Override +public CompletableFuture getTxnMetaAsync(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID)); +} + +@Override +public CompletableFuture newTransactionAsync() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350021225 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java ## @@ -32,70 +33,82 @@ public interface TransactionMetadataStore { /** - * Query the {@link TxnStatus} of a given transaction txnid. + * Query the {@link TxnStatus} of a given transaction txnId. * - * @param txnid transaction id + * @param txnID {@link TxnID} for get transaction status * @return a future represents the result of this operation. * it returns {@link TxnStatus} of the given transaction. */ -default CompletableFuture getTxnStatus(TxnID txnid) { -return getTxnMeta(txnid).thenApply(txnMeta -> txnMeta.status()); +default CompletableFuture getTxnStatus(TxnID txnID) { +return getTxnMetaAsync(txnID).thenApply(TxnMeta::status); } /** * Query the {@link TxnMeta} of a given transaction txnid. * - * @param txnid transaction id + * @param txnID {@link TxnID} for get transaction metadata * @return a future represents the result of this operation. * it returns {@link TxnMeta} of the given transaction. */ -CompletableFuture getTxnMeta(TxnID txnid); +CompletableFuture getTxnMetaAsync(TxnID txnID); + +/** + * Create a new transaction in the transaction metadata store. + * + * @return a future represents the result of creating a new transaction. + * it returns {@link TxnID} as the identifier for identifying the + * transaction. + */ +CompletableFuture newTransactionAsync(); /** * Create a new transaction in the transaction metadata store. * + * @param timeOut the timeout duration of the transaction in mills * @return a future represents the result of creating a new transaction. * it returns {@link TxnID} as the identifier for identifying the * transaction. */ -CompletableFuture newTransaction(); +CompletableFuture newTransactionAsync(long timeOut); Review comment: If the param timeOut use mills as time unit, please name it timeoutInMills. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350022645 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java ## @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.UnaryOperator; + +/** + * The implement of transaction metadata store state. + */ +public abstract class TransactionMetadataStoreState { + +/** + * The state of the transactionMetadataStore {@link TransactionMetadataStore}. + */ +public enum State { +None, +Initializing, +Ready, +Close +} + +private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(TransactionMetadataStoreState.class, State.class, "state"); + +@SuppressWarnings("unused") +private volatile State state = null; + +public TransactionMetadataStoreState(State state) { +STATE_UPDATER.set(this, state); + +} + +protected boolean changeToReadyState() { +return (STATE_UPDATER.compareAndSet(this, State.Initializing, State.Ready)); +} + +protected boolean changeToInitializingState() { +return STATE_UPDATER.compareAndSet(this, State.None, State.Initializing); +} + +protected boolean changeToClose() { Review comment: named changeToCloseState() since the others are changeToXXXState This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350042099 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore +extends TransactionMetadataStoreState implements TransactionMetadataStore { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); +private final ManagedLedgerTransactionLogImpl transactionLog; +private static final long TC_ID_NOT_USED = -1L; +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); +private SpscArrayQueue entryQueue; +private volatile long loadCount; + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +super(State.None); +this.tcID = tcID; +this.transactionLog = +new ManagedLedgerTransactionLogImpl(tcID.getId(), managedLedgerFactory); +this.entryQueue = new SpscArrayQueue<>(2000); +init(); +} + +private ConcurrentMap getTxnMetaMap() { +return txnMetaMap; +} + +public AtomicLong getSequenceId() { +return sequenceId; +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID).status()); +} + +@Override +public CompletableFuture getTxnMetaAsync(TxnID txnID) { +return CompletableFuture.completedFuture(txnMetaMap.get(txnID)); +} + +@Override +public CompletableFuture newTransactionAsync() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r350029476 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionLogImpl.java ## @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; +import org.apache.pulsar.transaction.coordinator.TransactionLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionLogImpl implements TransactionLog { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionLogImpl.class); + +private final ManagedLedger managedLedger; + +private final static String TRANSACTION_LOG_PREFIX = "transaction/log/"; + +private final ReadOnlyCursor readOnlyCursor; + +ManagedLedgerTransactionLogImpl(long tcID, +ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.managedLedger = managedLedgerFactory.open(TRANSACTION_LOG_PREFIX + tcID); +this.readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(TRANSACTION_LOG_PREFIX + tcID, +PositionImpl.earliest, new ManagedLedgerConfig()); +} + +@Override +public void read(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { +readOnlyCursor.asyncReadEntries(numberOfEntriesToRead, callback, System.nanoTime()); +} + +ReadOnlyCursor getReadOnlyCursor() { +return readOnlyCursor; +} + +@Override +public CompletableFuture close() { +try { +managedLedger.close(); +readOnlyCursor.close(); +return CompletableFuture.completedFuture(null); +} catch (InterruptedException | ManagedLedgerException e) { +return FutureUtil.failedFuture(e); +} +} + +@Override +public CompletableFuture write(TransactionMetadataEntry transactionMetadataEntry) { +int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize(); + +ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize); + +ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf); + +CompletableFuture completableFuture = new CompletableFuture<>(); + +try { +transactionMetadataEntry.writeTo(outStream); +managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() { +@Override +public void addComplete(Position position, Object ctx) { +buf.release(); +completableFuture.complete(null); +} + +@Override +public void addFailed(ManagedLedgerException exception, Object ctx) { +log.error("Transaction log write transaction operation error" + exception); +buf.release(); +completableFuture.completeExceptionally(exception); +} +} , null); +} catch (IOException e) { +log.error("Transaction log write transaction operation error" + e); Review comment: Same as above.
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r348377382 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java ## @@ -58,43 +69,56 @@ * it returns {@link TxnID} as the identifier for identifying the * transaction. */ -CompletableFuture newTransaction(); +CompletableFuture newTransactionAsync(); /** - * Add the produced partitions to transaction identified by txnid. + * Create a new transaction in the transaction metadata store. * - * @param txnid transaction id + * @param timeOut the timeout time Review comment: ```suggestion * @param timeOut the timeout duration of the transaction in mills ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r348391180 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TxnStoreStateUpdateException.java ## @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.exceptions; + +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; + +/** + * Exception is thrown when update the state incorrect in transaction store. + */ +public class TxnStoreStateUpdateException extends CoordinatorException { Review comment: Do we need this exception? i think the meta store should handle the state change internal This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r348376518 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java ## @@ -32,24 +33,34 @@ public interface TransactionMetadataStore { /** - * Query the {@link TxnStatus} of a given transaction txnid. + * The state of the transactionMetadataStore {@link TransactionMetadataStore}. + */ +enum State { +NONE, +INITIALIZING, +READY, +CLOSE +} + +/** + * Query the {@link TxnStatus} of a given transaction txnId. * - * @param txnid transaction id + * @param txnID {@link TxnID} for get transaction status * @return a future represents the result of this operation. * it returns {@link TxnStatus} of the given transaction. */ -default CompletableFuture getTxnStatus(TxnID txnid) { -return getTxnMeta(txnid).thenApply(txnMeta -> txnMeta.status()); +default CompletableFuture getTxnStatus(TxnID txnID) { +return getTxnMetaAsync(txnID).thenApply(TxnMeta::status); } Review comment: It's better to rename to `getTxnStatusAsync` since all async method end with Async This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346147104 ## File path: pulsar-transaction/common/pom.xml ## @@ -40,12 +40,22 @@ protobuf-java ${protobuf3.version} - com.google.protobuf protobuf-java-util ${protobuf3.version} + +${project.groupId} +pulsar-common +${project.parent.version} + Review comment: is it better to create a proto file in pulsar-transaction-common module? so that we don't need to add pulsar common dependency. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346158483 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java ## @@ -32,24 +33,34 @@ public interface TransactionMetadataStore { /** - * Query the {@link TxnStatus} of a given transaction txnid. + * The state of the transactionMetadataStore {@link TransactionMetadataStore}. + */ +enum State { +NONE, +INITIALIZING, +READY, +CLOSE +} + +/** + * Query the {@link TxnStatus} of a given transaction txnId. * - * @param txnid transaction id + * @param txnID {@link TxnID} for get transaction status * @return a future represents the result of this operation. * it returns {@link TxnStatus} of the given transaction. */ -default CompletableFuture getTxnStatus(TxnID txnid) { -return getTxnMeta(txnid).thenApply(txnMeta -> txnMeta.status()); +default CompletableFuture getTxnStatus(TxnID txnID) { +return getTxnMeta(txnID).thenApply(txnMeta -> txnMeta.status()); } /** * Query the {@link TxnMeta} of a given transaction txnid. * - * @param txnid transaction id + * @param txnID {@link TxnID} for get transaction metadata * @return a future represents the result of this operation. * it returns {@link TxnMeta} of the given transaction. */ -CompletableFuture getTxnMeta(TxnID txnid); +CompletableFuture getTxnMeta(TxnID txnID); Review comment: We'd better use `getTxnMetaAsync` here, since this is an asynchronous method, and the java doc also need to describe this is a asynchronous method. please check others. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346189781 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java ## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionReaderImpl implements +ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); Review comment: It's better to maintain cache in `TransactionMetastore` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346162726 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java ## @@ -70,6 +79,17 @@ TxnMeta addProducedPartitions(List partitions) throws InvalidTxnStatusException; +/** + * Add the list of subscriptions to the transaction. + * + * @param subscriptions + * @return transaction meta + * @throws InvalidTxnStatusException if the transaction is not in + * {@link TxnStatus#OPEN} + */ +TxnMeta addTxnSubscription(List subscriptions) Review comment: Please use `addAckedPartitions` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346166028 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore implements TransactionMetadataStore { + +private static final Logger LOGGER = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId; +private final ManagedLedgerTransactionReader reader; +private final ManagedLedgerTransactionWriter writer; Review comment: I recommend to create a `TransactionLog` interface here, a reader and writer for `TransactionLog` is better for understand. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346194349 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java ## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionReaderImpl implements +ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); + +private AtomicLong sequenceId = new AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED); + +private final ReadOnlyCursor readOnlyCursor; + +public ManagedLedgerTransactionReaderImpl(String tcId, ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.readOnlyCursor = managedLedgerFactory +.openReadOnlyCursor(tcId, +PositionImpl.earliest, new ManagedLedgerConfig()); +} + +private static List subscriptionToTxnSubscription(List subscriptions) { +List txnSubscriptions = new ArrayList<>(subscriptions.size()); +for (int i = 0; i < subscriptions.size(); i++) { +txnSubscriptions +.add(new TxnSubscription(subscriptions.get(i).getTopic(), subscriptions.get(i).getSubscription())); +} +return txnSubscriptions; +} + + +@Override +public TxnMeta getTxnMeta(TxnID txnID) { +return txnMetaMap.get(txnID); +} + +@Override +public Long readSequenceId() { +return sequenceId.get(); +} + +@Override +public void addNewTxn(TxnMeta txnMeta) { +txnMetaMap.put(txnMeta.id(), txnMeta); +} + +@Override +public TxnStatus getTxnStatus(TxnID txnID) { +return txnMetaMap.get(txnID).status(); +} + +@Override +public CompletableFuture init(TransactionMetadataStore transactionMetadataStore) { +CountDownLatch countDownLatch = new CountDownLatch(1); +countDownLatch.countDown(); +CompletableFuture completableFuture = new CompletableFuture<>(); + transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING); +readOnlyCursor +.asyncReadEntries(100, +new ReaderReadEntriesCallback(countDownLatch, txnMetaMap, sequenceId, Review comment: Also, expose configs for each read batch size. This is an
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346145938 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -645,6 +645,35 @@ enum TxnAction { ABORT = 1; } +enum TxnStatus { +OPEN = 0; +COMMITTING = 1; +COMMITTED = 2; +ABORTING = 3; +ABORTED= 4; +} + +message TransactionMetadataEntry { + +enum TransactionMetadataOp { +NEW = 0; +ADD_PARTITION = 1; +ADD_SUBSCRIPTION= 2; +UPDATE = 3; +} + +optional TransactionMetadataOp metadata_op = 1; +optional uint64 txnid_least_bits= 2 [default = 0]; +optional uint64 txnid_most_bits = 3 [default = 0]; +optional TxnStatus expected_status = 4; +optional TxnStatus new_status = 5; +repeated string partitions = 6; +repeated Subscription subscriptions = 7; +optional uint64 timeout_ms = 8; +optional uint64 start_time = 9; +optional uint64 last_modification_time = 10; +} + Review comment: I think `PulsarApi.proto` only for wire protocol, is it better to move TransactionMetadataEntry to a split proto file? @sijie please help confirm. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346162797 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java ## @@ -52,6 +53,14 @@ */ List producedPartitions(); +/** + * Return the the list of subscriptions that this transaction send to. + * + * @return the list of subscriptions that this transaction produced to. + * the returned list is sorted by partition name. + */ +List txnSubscription(); Review comment: please use `ackedPartitions()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346159426 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java ## @@ -61,41 +72,80 @@ CompletableFuture newTransaction(); /** - * Add the produced partitions to transaction identified by txnid. + * Create a new transaction in the transaction metadata store. * - * @param txnid transaction id + * @param timeOut the timeout time + * @return a future represents the result of creating a new transaction. + * it returns {@link TxnID} as the identifier for identifying the + * transaction. + */ +CompletableFuture newTransaction(long timeOut); + +/** + * Add the produced partitions to transaction identified by txnId. + * + * @param txnID {@link TxnID} for add produced partition to transaction * @param partitions the list of partitions that a transaction produces to * @return a future represents the result of this operation */ CompletableFuture addProducedPartitionToTxn( -TxnID txnid, List partitions); +TxnID txnID, List partitions); + +/** + * Add the acked subscriptions to transaction identified by txnId. + * + * @param txnID {@link TxnID} for add acked subscription + * @param txnSubscriptions the list of subscriptions that a transaction ack to + * @return a future represents the result of this operation + */ +CompletableFuture addAckedSubscriptionToTxn( +TxnID txnID, List txnSubscriptions); /** - * Add the acked partitions to transaction identified by txnid. + * Add the acked partitions to transaction identified by txnId. * - * @param txnid transaction id + * @param txnID {@link TxnID} for add acked partition * @param partitions the list of partitions that a transaction acknowledge to * @return a future represents the result of the operation */ CompletableFuture addAckedPartitionToTxn( -TxnID txnid, List partitions); +TxnID txnID, List partitions); /** * Update the transaction from expectedStatus to newStatus. * * If the current transaction status is not expectedStatus, the * update will be failed. * + * @param txnID {@link TxnID} for update txn status * @param newStatus the new txn status that the transaction should be updated to * @param expectedStatus the expected status that the transaction should be * @return a future represents the result of the operation */ CompletableFuture updateTxnStatus( -TxnID txnid, TxnStatus newStatus, TxnStatus expectedStatus); +TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus); /** * Close the transaction metadata store. + * + * @return a future represents the result of this operation */ CompletableFuture closeAsync(); +/** + * Update the {@link State}. + * + * @param state the transaction metadata store state {@link State} + * @return a future represents the result of this operation + */ +CompletableFuture updateMetadataStoreState(State state); + +/** + * Set the txn sequenceId. + * + * @param sequenceId the transaction sequenceId for new transaction Id + * @return a future represents the result of this operation + */ +CompletableFuture setTxnSequenceId(long sequenceId); + Review comment: Shall we need to expose these method in TransactionMetadataStore interface? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346179273 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore implements TransactionMetadataStore { + +private static final Logger LOGGER = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId; +private final ManagedLedgerTransactionReader reader; +private final ManagedLedgerTransactionWriter writer; +protected static final long TC_ID_NOT_USED = -1L; +private volatile State state; + +public State getState() { +return state; +} + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.tcID = tcID; +this.state = State.NONE; +this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), managedLedgerFactory); +this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), managedLedgerFactory); +this.reader.init(this); +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnStatus(txnID)); +} + +@Override +public CompletableFuture getTxnMeta(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnMeta(txnID)); +} + +@Override +public CompletableFuture newTransaction() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture newTransaction(long timeOut) { +checkArgument(state == State.READY, "Transaction metadata store " + state.name()); +long mostSigBits = tcID.getId(); +long leastSigBits = sequenceId.getAndIncrement(); + +TxnID txnID = new TxnID( +mostSigBits, +leastSigBits +); +long currentTimeMillis = System.currentTimeMillis(); +TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry +.newBuilder() +.setTxnidMostBits(mostSigBits) +.setTxnidLeastBits(leastSigBits) +.setStartTime(currentTimeMillis) +.setTimeoutMs(timeOut) +.setMetadataOp(TransactionMetadataOp.NEW) +.setLastModificationTime(currentTimeMillis) +.build(); +CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346198793 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java ## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionReaderImpl implements +ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); + +private AtomicLong sequenceId = new AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED); + +private final ReadOnlyCursor readOnlyCursor; + +public ManagedLedgerTransactionReaderImpl(String tcId, ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.readOnlyCursor = managedLedgerFactory +.openReadOnlyCursor(tcId, +PositionImpl.earliest, new ManagedLedgerConfig()); +} + +private static List subscriptionToTxnSubscription(List subscriptions) { +List txnSubscriptions = new ArrayList<>(subscriptions.size()); +for (int i = 0; i < subscriptions.size(); i++) { +txnSubscriptions +.add(new TxnSubscription(subscriptions.get(i).getTopic(), subscriptions.get(i).getSubscription())); +} +return txnSubscriptions; +} + + +@Override +public TxnMeta getTxnMeta(TxnID txnID) { +return txnMetaMap.get(txnID); +} + +@Override +public Long readSequenceId() { +return sequenceId.get(); +} + +@Override +public void addNewTxn(TxnMeta txnMeta) { +txnMetaMap.put(txnMeta.id(), txnMeta); +} + +@Override +public TxnStatus getTxnStatus(TxnID txnID) { +return txnMetaMap.get(txnID).status(); +} + +@Override +public CompletableFuture init(TransactionMetadataStore transactionMetadataStore) { +CountDownLatch countDownLatch = new CountDownLatch(1); +countDownLatch.countDown(); +CompletableFuture completableFuture = new CompletableFuture<>(); + transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING); +readOnlyCursor +.asyncReadEntries(100, +new ReaderReadEntriesCallback(countDownLatch, txnMetaMap, sequenceId, +transactionMetadataStore, completableFuture), System.nanoTime()); +return completableFuture; +} + +
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346183256 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore implements TransactionMetadataStore { + +private static final Logger LOGGER = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId; +private final ManagedLedgerTransactionReader reader; +private final ManagedLedgerTransactionWriter writer; +protected static final long TC_ID_NOT_USED = -1L; +private volatile State state; + +public State getState() { +return state; +} + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.tcID = tcID; +this.state = State.NONE; +this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), managedLedgerFactory); +this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), managedLedgerFactory); +this.reader.init(this); +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnStatus(txnID)); +} + +@Override +public CompletableFuture getTxnMeta(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnMeta(txnID)); +} + +@Override +public CompletableFuture newTransaction() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture newTransaction(long timeOut) { +checkArgument(state == State.READY, "Transaction metadata store " + state.name()); +long mostSigBits = tcID.getId(); +long leastSigBits = sequenceId.getAndIncrement(); + +TxnID txnID = new TxnID( +mostSigBits, +leastSigBits +); +long currentTimeMillis = System.currentTimeMillis(); +TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry +.newBuilder() +.setTxnidMostBits(mostSigBits) +.setTxnidLeastBits(leastSigBits) +.setStartTime(currentTimeMillis) +.setTimeoutMs(timeOut) +.setMetadataOp(TransactionMetadataOp.NEW) +.setLastModificationTime(currentTimeMillis) +.build(); +CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346174710 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore implements TransactionMetadataStore { + +private static final Logger LOGGER = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId; +private final ManagedLedgerTransactionReader reader; +private final ManagedLedgerTransactionWriter writer; +protected static final long TC_ID_NOT_USED = -1L; +private volatile State state; + +public State getState() { +return state; +} + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.tcID = tcID; +this.state = State.NONE; +this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), managedLedgerFactory); +this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), managedLedgerFactory); +this.reader.init(this); +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnStatus(txnID)); +} + +@Override +public CompletableFuture getTxnMeta(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnMeta(txnID)); +} + +@Override +public CompletableFuture newTransaction() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture newTransaction(long timeOut) { +checkArgument(state == State.READY, "Transaction metadata store " + state.name()); +long mostSigBits = tcID.getId(); +long leastSigBits = sequenceId.getAndIncrement(); + +TxnID txnID = new TxnID( +mostSigBits, +leastSigBits +); +long currentTimeMillis = System.currentTimeMillis(); +TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry +.newBuilder() +.setTxnidMostBits(mostSigBits) +.setTxnidLeastBits(leastSigBits) +.setStartTime(currentTimeMillis) +.setTimeoutMs(timeOut) +.setMetadataOp(TransactionMetadataOp.NEW) +.setLastModificationTime(currentTimeMillis) +.build(); +CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346193402 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java ## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionReaderImpl implements +ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); + +private AtomicLong sequenceId = new AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED); + +private final ReadOnlyCursor readOnlyCursor; + +public ManagedLedgerTransactionReaderImpl(String tcId, ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.readOnlyCursor = managedLedgerFactory +.openReadOnlyCursor(tcId, +PositionImpl.earliest, new ManagedLedgerConfig()); +} + +private static List subscriptionToTxnSubscription(List subscriptions) { +List txnSubscriptions = new ArrayList<>(subscriptions.size()); +for (int i = 0; i < subscriptions.size(); i++) { +txnSubscriptions +.add(new TxnSubscription(subscriptions.get(i).getTopic(), subscriptions.get(i).getSubscription())); +} +return txnSubscriptions; +} + + +@Override +public TxnMeta getTxnMeta(TxnID txnID) { +return txnMetaMap.get(txnID); +} + +@Override +public Long readSequenceId() { +return sequenceId.get(); +} + +@Override +public void addNewTxn(TxnMeta txnMeta) { +txnMetaMap.put(txnMeta.id(), txnMeta); +} + +@Override +public TxnStatus getTxnStatus(TxnID txnID) { +return txnMetaMap.get(txnID).status(); +} + +@Override +public CompletableFuture init(TransactionMetadataStore transactionMetadataStore) { +CountDownLatch countDownLatch = new CountDownLatch(1); +countDownLatch.countDown(); +CompletableFuture completableFuture = new CompletableFuture<>(); + transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING); +readOnlyCursor +.asyncReadEntries(100, +new ReaderReadEntriesCallback(countDownLatch, txnMetaMap, sequenceId, Review comment: every 100 messages will create a ReaderReadEntriesCallback instance, i think here can refer to approach like consumer read
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346200834 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java ## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionReaderImpl implements +ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); + +private AtomicLong sequenceId = new AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED); + +private final ReadOnlyCursor readOnlyCursor; + +public ManagedLedgerTransactionReaderImpl(String tcId, ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.readOnlyCursor = managedLedgerFactory +.openReadOnlyCursor(tcId, +PositionImpl.earliest, new ManagedLedgerConfig()); +} + +private static List subscriptionToTxnSubscription(List subscriptions) { +List txnSubscriptions = new ArrayList<>(subscriptions.size()); +for (int i = 0; i < subscriptions.size(); i++) { +txnSubscriptions +.add(new TxnSubscription(subscriptions.get(i).getTopic(), subscriptions.get(i).getSubscription())); +} +return txnSubscriptions; +} + + +@Override +public TxnMeta getTxnMeta(TxnID txnID) { +return txnMetaMap.get(txnID); +} + +@Override +public Long readSequenceId() { +return sequenceId.get(); +} + +@Override +public void addNewTxn(TxnMeta txnMeta) { +txnMetaMap.put(txnMeta.id(), txnMeta); +} + +@Override +public TxnStatus getTxnStatus(TxnID txnID) { +return txnMetaMap.get(txnID).status(); +} + +@Override +public CompletableFuture init(TransactionMetadataStore transactionMetadataStore) { +CountDownLatch countDownLatch = new CountDownLatch(1); +countDownLatch.countDown(); +CompletableFuture completableFuture = new CompletableFuture<>(); + transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING); +readOnlyCursor +.asyncReadEntries(100, +new ReaderReadEntriesCallback(countDownLatch, txnMetaMap, sequenceId, +transactionMetadataStore, completableFuture), System.nanoTime()); +return completableFuture; +} + +
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346189342 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java ## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ManagedLedgerTransactionReaderImpl implements +ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader { + +private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private ConcurrentMap txnMetaMap = new ConcurrentHashMap<>(); + +private AtomicLong sequenceId = new AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED); + +private final ReadOnlyCursor readOnlyCursor; + +public ManagedLedgerTransactionReaderImpl(String tcId, ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.readOnlyCursor = managedLedgerFactory +.openReadOnlyCursor(tcId, +PositionImpl.earliest, new ManagedLedgerConfig()); +} + +private static List subscriptionToTxnSubscription(List subscriptions) { +List txnSubscriptions = new ArrayList<>(subscriptions.size()); Review comment: Please considering reuse memory, this call lead more memory workload. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346167345 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore implements TransactionMetadataStore { + +private static final Logger LOGGER = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId; +private final ManagedLedgerTransactionReader reader; +private final ManagedLedgerTransactionWriter writer; +protected static final long TC_ID_NOT_USED = -1L; Review comment: Why need TC_ID_NOT_USED here, TCID will set by constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346189206 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore implements TransactionMetadataStore { + +private static final Logger LOGGER = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId; +private final ManagedLedgerTransactionReader reader; +private final ManagedLedgerTransactionWriter writer; +protected static final long TC_ID_NOT_USED = -1L; +private volatile State state; + +public State getState() { +return state; +} + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.tcID = tcID; +this.state = State.NONE; +this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), managedLedgerFactory); +this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), managedLedgerFactory); +this.reader.init(this); +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnStatus(txnID)); +} + +@Override +public CompletableFuture getTxnMeta(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnMeta(txnID)); +} + +@Override +public CompletableFuture newTransaction() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture newTransaction(long timeOut) { +checkArgument(state == State.READY, "Transaction metadata store " + state.name()); +long mostSigBits = tcID.getId(); +long leastSigBits = sequenceId.getAndIncrement(); + +TxnID txnID = new TxnID( +mostSigBits, +leastSigBits +); +long currentTimeMillis = System.currentTimeMillis(); +TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry +.newBuilder() +.setTxnidMostBits(mostSigBits) +.setTxnidLeastBits(leastSigBits) +.setStartTime(currentTimeMillis) +.setTimeoutMs(timeOut) +.setMetadataOp(TransactionMetadataOp.NEW) +.setLastModificationTime(currentTimeMillis) +.build(); +CompletableFuture
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346163715 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnSubscription.java ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +import lombok.Data; + +/** + * An class represents the subscription of a transaction in {@link TxnSubscription}. + */ +@Data +public class TxnSubscription { Review comment: We already have ``` message Subscription { required string topic = 1; required string subscription = 2; } ``` Why we need a new `TxnSubscription` class here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention
codelipenghui commented on a change in pull request #5570: Transaction log implemention URL: https://github.com/apache/pulsar/pull/5570#discussion_r346183113 ## File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java ## @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; + +import org.apache.pulsar.common.api.proto.PulsarApi.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry; +import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp; +import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus; +import org.apache.pulsar.common.util.FutureUtil; + +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.TxnSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. + */ +public class ManagedLedgerTransactionMetadataStore implements TransactionMetadataStore { + +private static final Logger LOGGER = LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class); + +private final TransactionCoordinatorID tcID; +private AtomicLong sequenceId; +private final ManagedLedgerTransactionReader reader; +private final ManagedLedgerTransactionWriter writer; +protected static final long TC_ID_NOT_USED = -1L; +private volatile State state; + +public State getState() { +return state; +} + +public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory) throws Exception { +this.tcID = tcID; +this.state = State.NONE; +this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), managedLedgerFactory); +this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), managedLedgerFactory); +this.reader.init(this); +} + +@Override +public CompletableFuture getTxnStatus(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnStatus(txnID)); +} + +@Override +public CompletableFuture getTxnMeta(TxnID txnID) { +return CompletableFuture.completedFuture(reader.getTxnMeta(txnID)); +} + +@Override +public CompletableFuture newTransaction() { +return FutureUtil.failedFuture(new UnsupportedOperationException()); +} + +@Override +public CompletableFuture newTransaction(long timeOut) { +checkArgument(state == State.READY, "Transaction metadata store " + state.name()); +long mostSigBits = tcID.getId(); +long leastSigBits = sequenceId.getAndIncrement(); + +TxnID txnID = new TxnID( +mostSigBits, +leastSigBits +); +long currentTimeMillis = System.currentTimeMillis(); +TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry +.newBuilder() +.setTxnidMostBits(mostSigBits) +.setTxnidLeastBits(leastSigBits) +.setStartTime(currentTimeMillis) +.setTimeoutMs(timeOut) +.setMetadataOp(TransactionMetadataOp.NEW) +.setLastModificationTime(currentTimeMillis) +.build(); +CompletableFuture