[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-12-03 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r353572944
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionMetadataStoreStateException.java
 ##
 @@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
+
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState.State;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+/**
+ * Exception is thrown when a operation of transaction is executed in a error 
transaction metadata store state.
+ */
+public class TransactionMetadataStoreStateException extends 
CoordinatorException {
 
 Review comment:
   It is TransactionStateException not MetadataStoreStateException, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-12-03 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r353572089
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
 ##
 @@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+/**
+ * A log interface for transaction to read and write transaction operation.
+ */
+public interface TransactionLog {
+
+
+/**
+ * Replay transaction log to load the transaction map.
+ *
+ * @param txnMetaMap the map of transaction metadata store to load
+ * @param sequenceId the sequence id for transaction metadata store to 
create new transaction
+ * @param replayCallback the call back for replaying the transaction log
+ */
+void replayAsync(ConcurrentMap txnMetaMap,
+AtomicLong sequenceId, ReplayCallback replayCallback);
+
+/**
+ * Read the entry from bookkeeper.
+ *
+ * @param numberOfEntriesToRead the number of reading entry
+ * @param callback the callback to executing when reading entry async 
finished
+ */
+void readAsync(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback, Object ctx);
 
 Review comment:
   Both write read and replay should use TransactionMetadataEntry not Entry or 
some other types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350019792
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
 ##
 @@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+/**
+ * A log interface for transaction to read and write transaction operation.
+ */
+public interface TransactionLog {
+
+/**
+ * Read the entry from bookkeeper.
+ *
+ * @param numberOfEntriesToRead the number of reading entry
+ * @param callback the callback to executing when reading entry async 
finished
+ */
+void read(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback 
callback, Object ctx);
 
 Review comment:
   It's better to call back the TransactionMetadataEntry straightforward, 
because this is a TransactionLog implementation, all of reads and writes need 
to facing TransactionMetadataEntry


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350041641
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore
+extends TransactionMetadataStoreState implements 
TransactionMetadataStore {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+private final ManagedLedgerTransactionLogImpl transactionLog;
+private static final long TC_ID_NOT_USED = -1L;
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+private SpscArrayQueue entryQueue;
+private volatile long loadCount;
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+super(State.None);
+this.tcID = tcID;
+this.transactionLog =
+new ManagedLedgerTransactionLogImpl(tcID.getId(), 
managedLedgerFactory);
+this.entryQueue = new SpscArrayQueue<>(2000);
+init();
+}
+
+private ConcurrentMap getTxnMetaMap() {
+return txnMetaMap;
+}
+
+public AtomicLong getSequenceId() {
+return sequenceId;
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return 
CompletableFuture.completedFuture(txnMetaMap.get(txnID).status());
+}
+
+@Override
+public CompletableFuture getTxnMetaAsync(TxnID txnID) {
+return CompletableFuture.completedFuture(txnMetaMap.get(txnID));
+}
+
+@Override
+public CompletableFuture newTransactionAsync() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350035812
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore
+extends TransactionMetadataStoreState implements 
TransactionMetadataStore {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+private final ManagedLedgerTransactionLogImpl transactionLog;
+private static final long TC_ID_NOT_USED = -1L;
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+private SpscArrayQueue entryQueue;
+private volatile long loadCount;
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+super(State.None);
+this.tcID = tcID;
+this.transactionLog =
+new ManagedLedgerTransactionLogImpl(tcID.getId(), 
managedLedgerFactory);
+this.entryQueue = new SpscArrayQueue<>(2000);
+init();
+}
+
+private ConcurrentMap getTxnMetaMap() {
+return txnMetaMap;
+}
+
+public AtomicLong getSequenceId() {
+return sequenceId;
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return 
CompletableFuture.completedFuture(txnMetaMap.get(txnID).status());
+}
+
+@Override
+public CompletableFuture getTxnMetaAsync(TxnID txnID) {
+return CompletableFuture.completedFuture(txnMetaMap.get(txnID));
+}
+
+@Override
+public CompletableFuture newTransactionAsync() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350040883
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore
+extends TransactionMetadataStoreState implements 
TransactionMetadataStore {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+private final ManagedLedgerTransactionLogImpl transactionLog;
+private static final long TC_ID_NOT_USED = -1L;
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+private SpscArrayQueue entryQueue;
+private volatile long loadCount;
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+super(State.None);
+this.tcID = tcID;
+this.transactionLog =
+new ManagedLedgerTransactionLogImpl(tcID.getId(), 
managedLedgerFactory);
+this.entryQueue = new SpscArrayQueue<>(2000);
+init();
+}
+
+private ConcurrentMap getTxnMetaMap() {
+return txnMetaMap;
+}
+
+public AtomicLong getSequenceId() {
+return sequenceId;
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return 
CompletableFuture.completedFuture(txnMetaMap.get(txnID).status());
+}
+
+@Override
+public CompletableFuture getTxnMetaAsync(TxnID txnID) {
+return CompletableFuture.completedFuture(txnMetaMap.get(txnID));
+}
+
+@Override
+public CompletableFuture newTransactionAsync() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r349979380
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionLogImpl.java
 ##
 @@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionLogImpl implements TransactionLog {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionLogImpl.class);
+
+private final ManagedLedger managedLedger;
+
+private final static String TRANSACTION_LOG_PREFIX = "transaction/log/";
+
+private final ReadOnlyCursor readOnlyCursor;
+
+ManagedLedgerTransactionLogImpl(long tcID,
+ManagedLedgerFactory managedLedgerFactory) 
throws Exception {
+this.managedLedger = managedLedgerFactory.open(TRANSACTION_LOG_PREFIX 
+ tcID);
+this.readOnlyCursor = 
managedLedgerFactory.openReadOnlyCursor(TRANSACTION_LOG_PREFIX + tcID,
+PositionImpl.earliest, new ManagedLedgerConfig());
+}
+
+@Override
+public void read(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
 
 Review comment:
   It's better to call back the TransactionMetadataEntry straightforward, 
because this is a TransactionLog implementation,  all of reads and writes need 
to facing TransactionMetadataEntry


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350025462
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionLogImpl.java
 ##
 @@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionLogImpl implements TransactionLog {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionLogImpl.class);
+
+private final ManagedLedger managedLedger;
+
+private final static String TRANSACTION_LOG_PREFIX = "transaction/log/";
+
+private final ReadOnlyCursor readOnlyCursor;
+
+ManagedLedgerTransactionLogImpl(long tcID,
+ManagedLedgerFactory managedLedgerFactory) 
throws Exception {
+this.managedLedger = managedLedgerFactory.open(TRANSACTION_LOG_PREFIX 
+ tcID);
+this.readOnlyCursor = 
managedLedgerFactory.openReadOnlyCursor(TRANSACTION_LOG_PREFIX + tcID,
+PositionImpl.earliest, new ManagedLedgerConfig());
+}
+
+@Override
+public void read(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+readOnlyCursor.asyncReadEntries(numberOfEntriesToRead, callback, 
System.nanoTime());
+}
+
+ReadOnlyCursor getReadOnlyCursor() {
+return readOnlyCursor;
+}
+
+@Override
+public CompletableFuture close() {
+try {
+managedLedger.close();
+readOnlyCursor.close();
 
 Review comment:
   You can call  `managedLedger.asyncClose()` and `readOnlyCursor.asyncClose()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r349981445
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore
+extends TransactionMetadataStoreState implements 
TransactionMetadataStore {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+private final ManagedLedgerTransactionLogImpl transactionLog;
+private static final long TC_ID_NOT_USED = -1L;
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+private SpscArrayQueue entryQueue;
+private volatile long loadCount;
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+super(State.None);
+this.tcID = tcID;
+this.transactionLog =
+new ManagedLedgerTransactionLogImpl(tcID.getId(), 
managedLedgerFactory);
+this.entryQueue = new SpscArrayQueue<>(2000);
+init();
+}
+
+private ConcurrentMap getTxnMetaMap() {
+return txnMetaMap;
+}
+
+public AtomicLong getSequenceId() {
+return sequenceId;
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return 
CompletableFuture.completedFuture(txnMetaMap.get(txnID).status());
+}
+
+@Override
+public CompletableFuture getTxnMetaAsync(TxnID txnID) {
+return CompletableFuture.completedFuture(txnMetaMap.get(txnID));
+}
+
+@Override
+public CompletableFuture newTransactionAsync() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350050273
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore
+extends TransactionMetadataStoreState implements 
TransactionMetadataStore {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+private final ManagedLedgerTransactionLogImpl transactionLog;
+private static final long TC_ID_NOT_USED = -1L;
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+private SpscArrayQueue entryQueue;
+private volatile long loadCount;
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+super(State.None);
+this.tcID = tcID;
+this.transactionLog =
+new ManagedLedgerTransactionLogImpl(tcID.getId(), 
managedLedgerFactory);
+this.entryQueue = new SpscArrayQueue<>(2000);
+init();
+}
+
+private ConcurrentMap getTxnMetaMap() {
+return txnMetaMap;
+}
+
+public AtomicLong getSequenceId() {
+return sequenceId;
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return 
CompletableFuture.completedFuture(txnMetaMap.get(txnID).status());
+}
+
+@Override
+public CompletableFuture getTxnMetaAsync(TxnID txnID) {
+return CompletableFuture.completedFuture(txnMetaMap.get(txnID));
+}
+
+@Override
+public CompletableFuture newTransactionAsync() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350020267
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
 ##
 @@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+/**
+ * A log interface for transaction to read and write transaction operation.
+ */
+public interface TransactionLog {
+
+/**
+ * Read the entry from bookkeeper.
+ *
+ * @param numberOfEntriesToRead the number of reading entry
+ * @param callback the callback to executing when reading entry async 
finished
+ */
+void read(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback 
callback, Object ctx);
 
 Review comment:
   And if the method is a async method, please all Async to the end(e.g. 
readAsync closeAsync writeAsync)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350036026
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore
+extends TransactionMetadataStoreState implements 
TransactionMetadataStore {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+private final ManagedLedgerTransactionLogImpl transactionLog;
+private static final long TC_ID_NOT_USED = -1L;
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+private SpscArrayQueue entryQueue;
+private volatile long loadCount;
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+super(State.None);
+this.tcID = tcID;
+this.transactionLog =
+new ManagedLedgerTransactionLogImpl(tcID.getId(), 
managedLedgerFactory);
+this.entryQueue = new SpscArrayQueue<>(2000);
+init();
+}
+
+private ConcurrentMap getTxnMetaMap() {
+return txnMetaMap;
+}
+
+public AtomicLong getSequenceId() {
+return sequenceId;
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return 
CompletableFuture.completedFuture(txnMetaMap.get(txnID).status());
+}
+
+@Override
+public CompletableFuture getTxnMetaAsync(TxnID txnID) {
+return CompletableFuture.completedFuture(txnMetaMap.get(txnID));
+}
+
+@Override
+public CompletableFuture newTransactionAsync() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350023649
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
 ##
 @@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.UnaryOperator;
+
+/**
+ * The implement of transaction metadata store state.
+ */
+public abstract class TransactionMetadataStoreState {
+
+/**
+ * The state of the transactionMetadataStore {@link 
TransactionMetadataStore}.
+ */
+public enum State {
+None,
+Initializing,
+Ready,
+Close
+}
+
+private static final 
AtomicReferenceFieldUpdater STATE_UPDATER 
=
+
AtomicReferenceFieldUpdater.newUpdater(TransactionMetadataStoreState.class, 
State.class, "state");
+
+@SuppressWarnings("unused")
+private volatile State state = null;
+
+public TransactionMetadataStoreState(State state) {
+STATE_UPDATER.set(this, state);
+
+}
+
+protected boolean changeToReadyState() {
+return (STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.Ready));
+}
+
+protected boolean changeToInitializingState() {
+return STATE_UPDATER.compareAndSet(this, State.None, 
State.Initializing);
+}
+
+protected boolean changeToClose() {
+return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
+|| STATE_UPDATER.compareAndSet(this, State.None, State.Close)
+|| STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.Close));
+}
+
+protected boolean checkIfReady() {
+return STATE_UPDATER.get(this) == State.Ready;
+}
+
+public boolean checkCurrentState(State state) {
+return STATE_UPDATER.get(this) == state;
+}
+
+public State getState() {
+return STATE_UPDATER.get(this);
+}
+
+protected void setState(State s) {
+STATE_UPDATER.set(this, s);
+}
+
+protected State getAndUpdateState(final UnaryOperator updater) {
+return STATE_UPDATER.getAndUpdate(this, updater);
+}
 
 Review comment:
   is any places used this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r349981383
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore
+extends TransactionMetadataStoreState implements 
TransactionMetadataStore {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+private final ManagedLedgerTransactionLogImpl transactionLog;
+private static final long TC_ID_NOT_USED = -1L;
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+private SpscArrayQueue entryQueue;
+private volatile long loadCount;
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+super(State.None);
+this.tcID = tcID;
+this.transactionLog =
+new ManagedLedgerTransactionLogImpl(tcID.getId(), 
managedLedgerFactory);
+this.entryQueue = new SpscArrayQueue<>(2000);
+init();
+}
+
+private ConcurrentMap getTxnMetaMap() {
+return txnMetaMap;
+}
+
+public AtomicLong getSequenceId() {
+return sequenceId;
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return 
CompletableFuture.completedFuture(txnMetaMap.get(txnID).status());
+}
+
+@Override
+public CompletableFuture getTxnMetaAsync(TxnID txnID) {
+return CompletableFuture.completedFuture(txnMetaMap.get(txnID));
+}
+
+@Override
+public CompletableFuture newTransactionAsync() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350021225
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
 ##
 @@ -32,70 +33,82 @@
 public interface TransactionMetadataStore {
 
 /**
- * Query the {@link TxnStatus} of a given transaction txnid.
+ * Query the {@link TxnStatus} of a given transaction txnId.
  *
- * @param txnid transaction id
+ * @param txnID {@link TxnID} for get transaction status
  * @return a future represents the result of this operation.
  * it returns {@link TxnStatus} of the given transaction.
  */
-default CompletableFuture getTxnStatus(TxnID txnid) {
-return getTxnMeta(txnid).thenApply(txnMeta -> txnMeta.status());
+default CompletableFuture getTxnStatus(TxnID txnID) {
+return getTxnMetaAsync(txnID).thenApply(TxnMeta::status);
 }
 
 /**
  * Query the {@link TxnMeta} of a given transaction txnid.
  *
- * @param txnid transaction id
+ * @param txnID {@link TxnID} for get transaction metadata
  * @return a future represents the result of this operation.
  * it returns {@link TxnMeta} of the given transaction.
  */
-CompletableFuture getTxnMeta(TxnID txnid);
+CompletableFuture getTxnMetaAsync(TxnID txnID);
+
+/**
+ * Create a new transaction in the transaction metadata store.
+ *
+ * @return a future represents the result of creating a new transaction.
+ * it returns {@link TxnID} as the identifier for identifying the
+ * transaction.
+ */
+CompletableFuture newTransactionAsync();
 
 /**
  * Create a new transaction in the transaction metadata store.
  *
+ * @param timeOut the timeout duration of the transaction in mills
  * @return a future represents the result of creating a new transaction.
  * it returns {@link TxnID} as the identifier for identifying the
  * transaction.
  */
-CompletableFuture newTransaction();
+CompletableFuture newTransactionAsync(long timeOut);
 
 Review comment:
   If the param timeOut use mills as time unit, please name it timeoutInMills.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350022645
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
 ##
 @@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.UnaryOperator;
+
+/**
+ * The implement of transaction metadata store state.
+ */
+public abstract class TransactionMetadataStoreState {
+
+/**
+ * The state of the transactionMetadataStore {@link 
TransactionMetadataStore}.
+ */
+public enum State {
+None,
+Initializing,
+Ready,
+Close
+}
+
+private static final 
AtomicReferenceFieldUpdater STATE_UPDATER 
=
+
AtomicReferenceFieldUpdater.newUpdater(TransactionMetadataStoreState.class, 
State.class, "state");
+
+@SuppressWarnings("unused")
+private volatile State state = null;
+
+public TransactionMetadataStoreState(State state) {
+STATE_UPDATER.set(this, state);
+
+}
+
+protected boolean changeToReadyState() {
+return (STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.Ready));
+}
+
+protected boolean changeToInitializingState() {
+return STATE_UPDATER.compareAndSet(this, State.None, 
State.Initializing);
+}
+
+protected boolean changeToClose() {
 
 Review comment:
   named changeToCloseState() since the others are changeToXXXState


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350042099
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionMetadataStoreStateException;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore
+extends TransactionMetadataStoreState implements 
TransactionMetadataStore {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionMetadataStore.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+private final ManagedLedgerTransactionLogImpl transactionLog;
+private static final long TC_ID_NOT_USED = -1L;
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+private SpscArrayQueue entryQueue;
+private volatile long loadCount;
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+super(State.None);
+this.tcID = tcID;
+this.transactionLog =
+new ManagedLedgerTransactionLogImpl(tcID.getId(), 
managedLedgerFactory);
+this.entryQueue = new SpscArrayQueue<>(2000);
+init();
+}
+
+private ConcurrentMap getTxnMetaMap() {
+return txnMetaMap;
+}
+
+public AtomicLong getSequenceId() {
+return sequenceId;
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return 
CompletableFuture.completedFuture(txnMetaMap.get(txnID).status());
+}
+
+@Override
+public CompletableFuture getTxnMetaAsync(TxnID txnID) {
+return CompletableFuture.completedFuture(txnMetaMap.get(txnID));
+}
+
+@Override
+public CompletableFuture newTransactionAsync() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-25 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r350029476
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionLogImpl.java
 ##
 @@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionLogImpl implements TransactionLog {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionLogImpl.class);
+
+private final ManagedLedger managedLedger;
+
+private final static String TRANSACTION_LOG_PREFIX = "transaction/log/";
+
+private final ReadOnlyCursor readOnlyCursor;
+
+ManagedLedgerTransactionLogImpl(long tcID,
+ManagedLedgerFactory managedLedgerFactory) 
throws Exception {
+this.managedLedger = managedLedgerFactory.open(TRANSACTION_LOG_PREFIX 
+ tcID);
+this.readOnlyCursor = 
managedLedgerFactory.openReadOnlyCursor(TRANSACTION_LOG_PREFIX + tcID,
+PositionImpl.earliest, new ManagedLedgerConfig());
+}
+
+@Override
+public void read(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+readOnlyCursor.asyncReadEntries(numberOfEntriesToRead, callback, 
System.nanoTime());
+}
+
+ReadOnlyCursor getReadOnlyCursor() {
+return readOnlyCursor;
+}
+
+@Override
+public CompletableFuture close() {
+try {
+managedLedger.close();
+readOnlyCursor.close();
+return CompletableFuture.completedFuture(null);
+} catch (InterruptedException | ManagedLedgerException e) {
+return FutureUtil.failedFuture(e);
+}
+}
+
+@Override
+public CompletableFuture write(TransactionMetadataEntry 
transactionMetadataEntry) {
+int transactionMetadataEntrySize = 
transactionMetadataEntry.getSerializedSize();
+
+ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, 
transactionMetadataEntrySize);
+
+ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf);
+
+CompletableFuture completableFuture = new CompletableFuture<>();
+
+try {
+transactionMetadataEntry.writeTo(outStream);
+managedLedger.asyncAddEntry(buf, new 
AsyncCallbacks.AddEntryCallback() {
+@Override
+public void addComplete(Position position, Object ctx) {
+buf.release();
+completableFuture.complete(null);
+}
+
+@Override
+public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+log.error("Transaction log write transaction operation 
error" + exception);
+buf.release();
+completableFuture.completeExceptionally(exception);
+}
+} , null);
+} catch (IOException e) {
+log.error("Transaction log write transaction operation error" + e);
 
 Review comment:
   Same as above.


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-20 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r348377382
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
 ##
 @@ -58,43 +69,56 @@
  * it returns {@link TxnID} as the identifier for identifying the
  * transaction.
  */
-CompletableFuture newTransaction();
+CompletableFuture newTransactionAsync();
 
 /**
- * Add the produced partitions to transaction identified by txnid.
+ * Create a new transaction in the transaction metadata store.
  *
- * @param txnid transaction id
+ * @param timeOut the timeout time
 
 Review comment:
   ```suggestion
* @param timeOut the timeout duration of the transaction in mills
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-20 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r348391180
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TxnStoreStateUpdateException.java
 ##
 @@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
+
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+
+/**
+ * Exception is thrown when update the state incorrect in transaction store.
+ */
+public class TxnStoreStateUpdateException extends CoordinatorException {
 
 Review comment:
   Do we need this exception? i think the meta store should handle the state 
change internal


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-20 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r348376518
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
 ##
 @@ -32,24 +33,34 @@
 public interface TransactionMetadataStore {
 
 /**
- * Query the {@link TxnStatus} of a given transaction txnid.
+ * The state of the transactionMetadataStore {@link 
TransactionMetadataStore}.
+ */
+enum State {
+NONE,
+INITIALIZING,
+READY,
+CLOSE
+}
+
+/**
+ * Query the {@link TxnStatus} of a given transaction txnId.
  *
- * @param txnid transaction id
+ * @param txnID {@link TxnID} for get transaction status
  * @return a future represents the result of this operation.
  * it returns {@link TxnStatus} of the given transaction.
  */
-default CompletableFuture getTxnStatus(TxnID txnid) {
-return getTxnMeta(txnid).thenApply(txnMeta -> txnMeta.status());
+default CompletableFuture getTxnStatus(TxnID txnID) {
+return getTxnMetaAsync(txnID).thenApply(TxnMeta::status);
 }
 
 Review comment:
   It's better to rename to `getTxnStatusAsync` since all async method end with 
Async


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346147104
 
 

 ##
 File path: pulsar-transaction/common/pom.xml
 ##
 @@ -40,12 +40,22 @@
 protobuf-java
 ${protobuf3.version}
 
-
 
 com.google.protobuf
 protobuf-java-util
 ${protobuf3.version}
 
+
+${project.groupId}
+pulsar-common
+${project.parent.version}
+
 
 Review comment:
   is it better to create a proto file in pulsar-transaction-common module? so 
that we don't need to add pulsar common dependency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346158483
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
 ##
 @@ -32,24 +33,34 @@
 public interface TransactionMetadataStore {
 
 /**
- * Query the {@link TxnStatus} of a given transaction txnid.
+ * The state of the transactionMetadataStore {@link 
TransactionMetadataStore}.
+ */
+enum State {
+NONE,
+INITIALIZING,
+READY,
+CLOSE
+}
+
+/**
+ * Query the {@link TxnStatus} of a given transaction txnId.
  *
- * @param txnid transaction id
+ * @param txnID {@link TxnID} for get transaction status
  * @return a future represents the result of this operation.
  * it returns {@link TxnStatus} of the given transaction.
  */
-default CompletableFuture getTxnStatus(TxnID txnid) {
-return getTxnMeta(txnid).thenApply(txnMeta -> txnMeta.status());
+default CompletableFuture getTxnStatus(TxnID txnID) {
+return getTxnMeta(txnID).thenApply(txnMeta -> txnMeta.status());
 }
 
 /**
  * Query the {@link TxnMeta} of a given transaction txnid.
  *
- * @param txnid transaction id
+ * @param txnID {@link TxnID} for get transaction metadata
  * @return a future represents the result of this operation.
  * it returns {@link TxnMeta} of the given transaction.
  */
-CompletableFuture getTxnMeta(TxnID txnid);
+CompletableFuture getTxnMeta(TxnID txnID);
 
 Review comment:
   We'd better use `getTxnMetaAsync` here, since this is an asynchronous 
method, and the java doc also need to describe this is a asynchronous method. 
please check others.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346189781
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java
 ##
 @@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionReaderImpl implements
+ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
 
 Review comment:
   It's better to maintain cache in `TransactionMetastore`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346162726
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
 ##
 @@ -70,6 +79,17 @@
 TxnMeta addProducedPartitions(List partitions)
 throws InvalidTxnStatusException;
 
+/**
+ * Add the list of subscriptions to the transaction.
+ *
+ * @param subscriptions
+ * @return transaction meta
+ * @throws InvalidTxnStatusException if the transaction is not in
+ * {@link TxnStatus#OPEN}
+ */
+TxnMeta addTxnSubscription(List subscriptions)
 
 Review comment:
   Please use `addAckedPartitions`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346166028
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore implements 
TransactionMetadataStore {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId;
+private final ManagedLedgerTransactionReader reader;
+private final ManagedLedgerTransactionWriter writer;
 
 Review comment:
   I recommend to create a `TransactionLog` interface here,  a reader and 
writer for `TransactionLog` is better for understand.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346194349
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java
 ##
 @@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionReaderImpl implements
+ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+
+private AtomicLong sequenceId = new 
AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED);
+
+private final ReadOnlyCursor readOnlyCursor;
+
+public ManagedLedgerTransactionReaderImpl(String tcId, 
ManagedLedgerFactory managedLedgerFactory) throws Exception {
+this.readOnlyCursor = managedLedgerFactory
+.openReadOnlyCursor(tcId,
+PositionImpl.earliest, new ManagedLedgerConfig());
+}
+
+private static List 
subscriptionToTxnSubscription(List subscriptions) {
+List txnSubscriptions = new 
ArrayList<>(subscriptions.size());
+for (int i = 0; i < subscriptions.size(); i++) {
+txnSubscriptions
+.add(new TxnSubscription(subscriptions.get(i).getTopic(), 
subscriptions.get(i).getSubscription()));
+}
+return txnSubscriptions;
+}
+
+
+@Override
+public TxnMeta getTxnMeta(TxnID txnID) {
+return txnMetaMap.get(txnID);
+}
+
+@Override
+public Long readSequenceId() {
+return sequenceId.get();
+}
+
+@Override
+public void addNewTxn(TxnMeta txnMeta) {
+txnMetaMap.put(txnMeta.id(), txnMeta);
+}
+
+@Override
+public TxnStatus getTxnStatus(TxnID txnID) {
+return txnMetaMap.get(txnID).status();
+}
+
+@Override
+public CompletableFuture init(TransactionMetadataStore 
transactionMetadataStore) {
+CountDownLatch countDownLatch = new CountDownLatch(1);
+countDownLatch.countDown();
+CompletableFuture completableFuture = new CompletableFuture<>();
+
transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING);
+readOnlyCursor
+.asyncReadEntries(100,
+new ReaderReadEntriesCallback(countDownLatch, 
txnMetaMap, sequenceId,
 
 Review comment:
   Also, expose configs for each read batch size.


This is an 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346145938
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -645,6 +645,35 @@ enum TxnAction {
 ABORT = 1;
 }
 
+enum TxnStatus {
+OPEN   = 0;
+COMMITTING = 1;
+COMMITTED  = 2;
+ABORTING   = 3;
+ABORTED= 4;
+}
+
+message TransactionMetadataEntry {
+
+enum TransactionMetadataOp {
+NEW = 0;
+ADD_PARTITION   = 1;
+ADD_SUBSCRIPTION= 2;
+UPDATE  = 3;
+}
+
+optional TransactionMetadataOp metadata_op   = 1;
+optional uint64 txnid_least_bits= 2 [default = 0];
+optional uint64 txnid_most_bits = 3 [default = 0];
+optional TxnStatus expected_status  = 4;
+optional TxnStatus new_status   = 5;
+repeated string partitions  = 6;
+repeated Subscription subscriptions = 7;
+optional uint64 timeout_ms  = 8;
+optional uint64 start_time  = 9;
+optional uint64 last_modification_time = 10;
+}
+
 
 Review comment:
   I think `PulsarApi.proto` only for wire protocol, is it better to move 
TransactionMetadataEntry to a split proto file? @sijie please help confirm.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346162797
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
 ##
 @@ -52,6 +53,14 @@
  */
 List producedPartitions();
 
+/**
+ * Return the the list of subscriptions that this transaction send to.
+ *
+ * @return the list of subscriptions that this transaction produced to.
+ * the returned list is sorted by partition name.
+ */
+List txnSubscription();
 
 Review comment:
   please use `ackedPartitions()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346159426
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
 ##
 @@ -61,41 +72,80 @@
 CompletableFuture newTransaction();
 
 /**
- * Add the produced partitions to transaction identified by txnid.
+ * Create a new transaction in the transaction metadata store.
  *
- * @param txnid transaction id
+ * @param timeOut the timeout time
+ * @return a future represents the result of creating a new transaction.
+ * it returns {@link TxnID} as the identifier for identifying the
+ * transaction.
+ */
+CompletableFuture newTransaction(long timeOut);
+
+/**
+ * Add the produced partitions to transaction identified by txnId.
+ *
+ * @param txnID {@link TxnID} for add produced partition to transaction
  * @param partitions the list of partitions that a transaction produces to
  * @return a future represents the result of this operation
  */
 CompletableFuture addProducedPartitionToTxn(
-TxnID txnid, List partitions);
+TxnID txnID, List partitions);
+
+/**
+ * Add the acked subscriptions to transaction identified by txnId.
+ *
+ * @param txnID {@link TxnID} for add acked subscription
+ * @param txnSubscriptions the list of subscriptions that a transaction 
ack to
+ * @return a future represents the result of this operation
+ */
+CompletableFuture addAckedSubscriptionToTxn(
+TxnID txnID, List txnSubscriptions);
 
 /**
- * Add the acked partitions to transaction identified by txnid.
+ * Add the acked partitions to transaction identified by txnId.
  *
- * @param txnid transaction id
+ * @param txnID {@link TxnID} for add acked partition
  * @param partitions the list of partitions that a transaction acknowledge 
to
  * @return a future represents the result of the operation
  */
 CompletableFuture addAckedPartitionToTxn(
-TxnID txnid, List partitions);
+TxnID txnID, List partitions);
 
 /**
  * Update the transaction from expectedStatus to 
newStatus.
  *
  * If the current transaction status is not expectedStatus, the
  * update will be failed.
  *
+ * @param txnID {@link TxnID} for update txn status
  * @param newStatus the new txn status that the transaction should be 
updated to
  * @param expectedStatus the expected status that the transaction should be
  * @return a future represents the result of the operation
  */
 CompletableFuture updateTxnStatus(
-TxnID txnid, TxnStatus newStatus, TxnStatus expectedStatus);
+TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus);
 
 /**
  * Close the transaction metadata store.
+ *
+ * @return a future represents the result of this operation
  */
 CompletableFuture closeAsync();
 
+/**
+ * Update the {@link State}.
+ *
+ * @param state the transaction metadata store state {@link State}
+ * @return a future represents the result of this operation
+ */
+CompletableFuture updateMetadataStoreState(State state);
+
+/**
+ * Set the txn sequenceId.
+ *
+ * @param sequenceId the transaction sequenceId for new transaction Id
+ * @return a future represents the result of this operation
+ */
+CompletableFuture setTxnSequenceId(long sequenceId);
+
 
 Review comment:
   Shall we need to expose these method in TransactionMetadataStore interface?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346179273
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore implements 
TransactionMetadataStore {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId;
+private final ManagedLedgerTransactionReader reader;
+private final ManagedLedgerTransactionWriter writer;
+protected static final long TC_ID_NOT_USED = -1L;
+private volatile State state;
+
+public State getState() {
+return state;
+}
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+this.tcID = tcID;
+this.state = State.NONE;
+this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader.init(this);
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnStatus(txnID));
+}
+
+@Override
+public CompletableFuture getTxnMeta(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnMeta(txnID));
+}
+
+@Override
+public CompletableFuture newTransaction() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture newTransaction(long timeOut) {
+checkArgument(state == State.READY, "Transaction metadata store " + 
state.name());
+long mostSigBits = tcID.getId();
+long leastSigBits = sequenceId.getAndIncrement();
+
+TxnID txnID = new TxnID(
+mostSigBits,
+leastSigBits
+);
+long currentTimeMillis = System.currentTimeMillis();
+TransactionMetadataEntry transactionMetadataEntry = 
TransactionMetadataEntry
+.newBuilder()
+.setTxnidMostBits(mostSigBits)
+.setTxnidLeastBits(leastSigBits)
+.setStartTime(currentTimeMillis)
+.setTimeoutMs(timeOut)
+.setMetadataOp(TransactionMetadataOp.NEW)
+.setLastModificationTime(currentTimeMillis)
+.build();
+CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346198793
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java
 ##
 @@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionReaderImpl implements
+ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+
+private AtomicLong sequenceId = new 
AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED);
+
+private final ReadOnlyCursor readOnlyCursor;
+
+public ManagedLedgerTransactionReaderImpl(String tcId, 
ManagedLedgerFactory managedLedgerFactory) throws Exception {
+this.readOnlyCursor = managedLedgerFactory
+.openReadOnlyCursor(tcId,
+PositionImpl.earliest, new ManagedLedgerConfig());
+}
+
+private static List 
subscriptionToTxnSubscription(List subscriptions) {
+List txnSubscriptions = new 
ArrayList<>(subscriptions.size());
+for (int i = 0; i < subscriptions.size(); i++) {
+txnSubscriptions
+.add(new TxnSubscription(subscriptions.get(i).getTopic(), 
subscriptions.get(i).getSubscription()));
+}
+return txnSubscriptions;
+}
+
+
+@Override
+public TxnMeta getTxnMeta(TxnID txnID) {
+return txnMetaMap.get(txnID);
+}
+
+@Override
+public Long readSequenceId() {
+return sequenceId.get();
+}
+
+@Override
+public void addNewTxn(TxnMeta txnMeta) {
+txnMetaMap.put(txnMeta.id(), txnMeta);
+}
+
+@Override
+public TxnStatus getTxnStatus(TxnID txnID) {
+return txnMetaMap.get(txnID).status();
+}
+
+@Override
+public CompletableFuture init(TransactionMetadataStore 
transactionMetadataStore) {
+CountDownLatch countDownLatch = new CountDownLatch(1);
+countDownLatch.countDown();
+CompletableFuture completableFuture = new CompletableFuture<>();
+
transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING);
+readOnlyCursor
+.asyncReadEntries(100,
+new ReaderReadEntriesCallback(countDownLatch, 
txnMetaMap, sequenceId,
+transactionMetadataStore, completableFuture), 
System.nanoTime());
+return completableFuture;
+}
+
+

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346183256
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore implements 
TransactionMetadataStore {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId;
+private final ManagedLedgerTransactionReader reader;
+private final ManagedLedgerTransactionWriter writer;
+protected static final long TC_ID_NOT_USED = -1L;
+private volatile State state;
+
+public State getState() {
+return state;
+}
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+this.tcID = tcID;
+this.state = State.NONE;
+this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader.init(this);
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnStatus(txnID));
+}
+
+@Override
+public CompletableFuture getTxnMeta(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnMeta(txnID));
+}
+
+@Override
+public CompletableFuture newTransaction() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture newTransaction(long timeOut) {
+checkArgument(state == State.READY, "Transaction metadata store " + 
state.name());
+long mostSigBits = tcID.getId();
+long leastSigBits = sequenceId.getAndIncrement();
+
+TxnID txnID = new TxnID(
+mostSigBits,
+leastSigBits
+);
+long currentTimeMillis = System.currentTimeMillis();
+TransactionMetadataEntry transactionMetadataEntry = 
TransactionMetadataEntry
+.newBuilder()
+.setTxnidMostBits(mostSigBits)
+.setTxnidLeastBits(leastSigBits)
+.setStartTime(currentTimeMillis)
+.setTimeoutMs(timeOut)
+.setMetadataOp(TransactionMetadataOp.NEW)
+.setLastModificationTime(currentTimeMillis)
+.build();
+CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346174710
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore implements 
TransactionMetadataStore {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId;
+private final ManagedLedgerTransactionReader reader;
+private final ManagedLedgerTransactionWriter writer;
+protected static final long TC_ID_NOT_USED = -1L;
+private volatile State state;
+
+public State getState() {
+return state;
+}
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+this.tcID = tcID;
+this.state = State.NONE;
+this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader.init(this);
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnStatus(txnID));
+}
+
+@Override
+public CompletableFuture getTxnMeta(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnMeta(txnID));
+}
+
+@Override
+public CompletableFuture newTransaction() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture newTransaction(long timeOut) {
+checkArgument(state == State.READY, "Transaction metadata store " + 
state.name());
+long mostSigBits = tcID.getId();
+long leastSigBits = sequenceId.getAndIncrement();
+
+TxnID txnID = new TxnID(
+mostSigBits,
+leastSigBits
+);
+long currentTimeMillis = System.currentTimeMillis();
+TransactionMetadataEntry transactionMetadataEntry = 
TransactionMetadataEntry
+.newBuilder()
+.setTxnidMostBits(mostSigBits)
+.setTxnidLeastBits(leastSigBits)
+.setStartTime(currentTimeMillis)
+.setTimeoutMs(timeOut)
+.setMetadataOp(TransactionMetadataOp.NEW)
+.setLastModificationTime(currentTimeMillis)
+.build();
+CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346193402
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java
 ##
 @@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionReaderImpl implements
+ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+
+private AtomicLong sequenceId = new 
AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED);
+
+private final ReadOnlyCursor readOnlyCursor;
+
+public ManagedLedgerTransactionReaderImpl(String tcId, 
ManagedLedgerFactory managedLedgerFactory) throws Exception {
+this.readOnlyCursor = managedLedgerFactory
+.openReadOnlyCursor(tcId,
+PositionImpl.earliest, new ManagedLedgerConfig());
+}
+
+private static List 
subscriptionToTxnSubscription(List subscriptions) {
+List txnSubscriptions = new 
ArrayList<>(subscriptions.size());
+for (int i = 0; i < subscriptions.size(); i++) {
+txnSubscriptions
+.add(new TxnSubscription(subscriptions.get(i).getTopic(), 
subscriptions.get(i).getSubscription()));
+}
+return txnSubscriptions;
+}
+
+
+@Override
+public TxnMeta getTxnMeta(TxnID txnID) {
+return txnMetaMap.get(txnID);
+}
+
+@Override
+public Long readSequenceId() {
+return sequenceId.get();
+}
+
+@Override
+public void addNewTxn(TxnMeta txnMeta) {
+txnMetaMap.put(txnMeta.id(), txnMeta);
+}
+
+@Override
+public TxnStatus getTxnStatus(TxnID txnID) {
+return txnMetaMap.get(txnID).status();
+}
+
+@Override
+public CompletableFuture init(TransactionMetadataStore 
transactionMetadataStore) {
+CountDownLatch countDownLatch = new CountDownLatch(1);
+countDownLatch.countDown();
+CompletableFuture completableFuture = new CompletableFuture<>();
+
transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING);
+readOnlyCursor
+.asyncReadEntries(100,
+new ReaderReadEntriesCallback(countDownLatch, 
txnMetaMap, sequenceId,
 
 Review comment:
   every 100 messages will create a ReaderReadEntriesCallback instance, i think 
here can refer to approach like consumer read 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346200834
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java
 ##
 @@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionReaderImpl implements
+ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+
+private AtomicLong sequenceId = new 
AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED);
+
+private final ReadOnlyCursor readOnlyCursor;
+
+public ManagedLedgerTransactionReaderImpl(String tcId, 
ManagedLedgerFactory managedLedgerFactory) throws Exception {
+this.readOnlyCursor = managedLedgerFactory
+.openReadOnlyCursor(tcId,
+PositionImpl.earliest, new ManagedLedgerConfig());
+}
+
+private static List 
subscriptionToTxnSubscription(List subscriptions) {
+List txnSubscriptions = new 
ArrayList<>(subscriptions.size());
+for (int i = 0; i < subscriptions.size(); i++) {
+txnSubscriptions
+.add(new TxnSubscription(subscriptions.get(i).getTopic(), 
subscriptions.get(i).getSubscription()));
+}
+return txnSubscriptions;
+}
+
+
+@Override
+public TxnMeta getTxnMeta(TxnID txnID) {
+return txnMetaMap.get(txnID);
+}
+
+@Override
+public Long readSequenceId() {
+return sequenceId.get();
+}
+
+@Override
+public void addNewTxn(TxnMeta txnMeta) {
+txnMetaMap.put(txnMeta.id(), txnMeta);
+}
+
+@Override
+public TxnStatus getTxnStatus(TxnID txnID) {
+return txnMetaMap.get(txnID).status();
+}
+
+@Override
+public CompletableFuture init(TransactionMetadataStore 
transactionMetadataStore) {
+CountDownLatch countDownLatch = new CountDownLatch(1);
+countDownLatch.countDown();
+CompletableFuture completableFuture = new CompletableFuture<>();
+
transactionMetadataStore.updateMetadataStoreState(TransactionMetadataStore.State.INITIALIZING);
+readOnlyCursor
+.asyncReadEntries(100,
+new ReaderReadEntriesCallback(countDownLatch, 
txnMetaMap, sequenceId,
+transactionMetadataStore, completableFuture), 
System.nanoTime());
+return completableFuture;
+}
+
+

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346189342
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionReaderImpl.java
 ##
 @@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ManagedLedgerTransactionReaderImpl implements
+ManagedLedgerTransactionMetadataStore.ManagedLedgerTransactionReader {
+
+private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private ConcurrentMap txnMetaMap = new 
ConcurrentHashMap<>();
+
+private AtomicLong sequenceId = new 
AtomicLong(ManagedLedgerTransactionMetadataStore.TC_ID_NOT_USED);
+
+private final ReadOnlyCursor readOnlyCursor;
+
+public ManagedLedgerTransactionReaderImpl(String tcId, 
ManagedLedgerFactory managedLedgerFactory) throws Exception {
+this.readOnlyCursor = managedLedgerFactory
+.openReadOnlyCursor(tcId,
+PositionImpl.earliest, new ManagedLedgerConfig());
+}
+
+private static List 
subscriptionToTxnSubscription(List subscriptions) {
+List txnSubscriptions = new 
ArrayList<>(subscriptions.size());
 
 Review comment:
   Please considering reuse memory, this call lead more memory workload.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346167345
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore implements 
TransactionMetadataStore {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId;
+private final ManagedLedgerTransactionReader reader;
+private final ManagedLedgerTransactionWriter writer;
+protected static final long TC_ID_NOT_USED = -1L;
 
 Review comment:
   Why need TC_ID_NOT_USED here,  TCID will set by constructor. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346189206
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore implements 
TransactionMetadataStore {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId;
+private final ManagedLedgerTransactionReader reader;
+private final ManagedLedgerTransactionWriter writer;
+protected static final long TC_ID_NOT_USED = -1L;
+private volatile State state;
+
+public State getState() {
+return state;
+}
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+this.tcID = tcID;
+this.state = State.NONE;
+this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader.init(this);
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnStatus(txnID));
+}
+
+@Override
+public CompletableFuture getTxnMeta(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnMeta(txnID));
+}
+
+@Override
+public CompletableFuture newTransaction() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture newTransaction(long timeOut) {
+checkArgument(state == State.READY, "Transaction metadata store " + 
state.name());
+long mostSigBits = tcID.getId();
+long leastSigBits = sequenceId.getAndIncrement();
+
+TxnID txnID = new TxnID(
+mostSigBits,
+leastSigBits
+);
+long currentTimeMillis = System.currentTimeMillis();
+TransactionMetadataEntry transactionMetadataEntry = 
TransactionMetadataEntry
+.newBuilder()
+.setTxnidMostBits(mostSigBits)
+.setTxnidLeastBits(leastSigBits)
+.setStartTime(currentTimeMillis)
+.setTimeoutMs(timeOut)
+.setMetadataOp(TransactionMetadataOp.NEW)
+.setLastModificationTime(currentTimeMillis)
+.build();
+CompletableFuture 

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346163715
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnSubscription.java
 ##
 @@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import lombok.Data;
+
+/**
+ * An class represents the subscription of a transaction in {@link 
TxnSubscription}.
+ */
+@Data
+public class TxnSubscription {
 
 Review comment:
   We already have 
   ```
   message Subscription {
   required string topic = 1;
   required string subscription = 2;
   }
   ```
   Why we need a new `TxnSubscription` class here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #5570: Transaction log implemention

2019-11-14 Thread GitBox
codelipenghui commented on a change in pull request #5570: Transaction log 
implemention
URL: https://github.com/apache/pulsar/pull/5570#discussion_r346183113
 
 

 ##
 File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/ManagedLedgerTransactionMetadataStore.java
 ##
 @@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnStatus;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.TxnSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link 
TransactionMetadataStore}.
+ */
+public class ManagedLedgerTransactionMetadataStore implements 
TransactionMetadataStore {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagedLedgerTransactionReaderImpl.class);
+
+private final TransactionCoordinatorID tcID;
+private AtomicLong sequenceId;
+private final ManagedLedgerTransactionReader reader;
+private final ManagedLedgerTransactionWriter writer;
+protected static final long TC_ID_NOT_USED = -1L;
+private volatile State state;
+
+public State getState() {
+return state;
+}
+
+public ManagedLedgerTransactionMetadataStore(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory 
managedLedgerFactory) throws Exception {
+this.tcID = tcID;
+this.state = State.NONE;
+this.writer = new ManagedLedgerTransactionWriterImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader = new ManagedLedgerTransactionReaderImpl(tcID.toString(), 
managedLedgerFactory);
+this.reader.init(this);
+}
+
+@Override
+public CompletableFuture getTxnStatus(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnStatus(txnID));
+}
+
+@Override
+public CompletableFuture getTxnMeta(TxnID txnID) {
+return CompletableFuture.completedFuture(reader.getTxnMeta(txnID));
+}
+
+@Override
+public CompletableFuture newTransaction() {
+return FutureUtil.failedFuture(new UnsupportedOperationException());
+}
+
+@Override
+public CompletableFuture newTransaction(long timeOut) {
+checkArgument(state == State.READY, "Transaction metadata store " + 
state.name());
+long mostSigBits = tcID.getId();
+long leastSigBits = sequenceId.getAndIncrement();
+
+TxnID txnID = new TxnID(
+mostSigBits,
+leastSigBits
+);
+long currentTimeMillis = System.currentTimeMillis();
+TransactionMetadataEntry transactionMetadataEntry = 
TransactionMetadataEntry
+.newBuilder()
+.setTxnidMostBits(mostSigBits)
+.setTxnidLeastBits(leastSigBits)
+.setStartTime(currentTimeMillis)
+.setTimeoutMs(timeOut)
+.setMetadataOp(TransactionMetadataOp.NEW)
+.setLastModificationTime(currentTimeMillis)
+.build();
+CompletableFuture