This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 56522d1ce7b49f13a30507536069a16db56b2cff Author: Willem Jiang <jiangni...@huawei.com> AuthorDate: Wed Aug 29 16:47:10 2018 +0800 SCB-868 Added Segment to the Alpha Server --- .../servicecomb/saga/alpha/core/TxConsistentService.java | 7 ++++++- .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 2 +- .../saga/alpha/server/SpringCommandRepository.java | 8 ++++++++ .../saga/alpha/server/SpringTxEventRepository.java | 11 +++++++++++ .../saga/alpha/server/SpringTxTimeoutRepository.java | 9 ++++++++- 5 files changed, 34 insertions(+), 3 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index 9a7f82b..ad08440 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -28,6 +28,10 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import kamon.annotation.EnableKamon; +import kamon.annotation.Segment; + +@EnableKamon public class TxConsistentService { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -38,7 +42,7 @@ public class TxConsistentService { public TxConsistentService(TxEventRepository eventRepository) { this.eventRepository = eventRepository; } - + @Segment(name = "handleTxEvent", category = "application", library = "kamon") public boolean handle(TxEvent event) { if (types.contains(event.type()) && isGlobalTxAborted(event)) { LOG.info("Transaction event {} rejected, because its parent with globalTxId {} was already aborted", @@ -51,6 +55,7 @@ public class TxConsistentService { return true; } + @Segment(name = "isGlobalTxAborted", category = "application", library = "kamon") private boolean isGlobalTxAborted(TxEvent event) { return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty(); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index 04e017d..677fa87 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -79,7 +79,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { } @Override - @Trace("ontransactionEvent") + @Trace("onTransactionEvent") public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) { boolean ok = txConsistentService.handle(new TxEvent( message.getServiceName(), diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index 4aa30d1..f7078c2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -34,6 +34,10 @@ import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import kamon.annotation.EnableKamon; +import kamon.annotation.Segment; + +@EnableKamon public class SpringCommandRepository implements CommandRepository { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -46,6 +50,7 @@ public class SpringCommandRepository implements CommandRepository { } @Override + @Segment(name = "saveCompensationCommands", category = "application", library = "kamon") public void saveCompensationCommands(String globalTxId) { List<TxEvent> events = eventRepository .findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId); @@ -68,17 +73,20 @@ public class SpringCommandRepository implements CommandRepository { } @Override + @Segment(name = "markCommandAsDone", category = "application", library = "kamon") public void markCommandAsDone(String globalTxId, String localTxId) { commandRepository.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId); } @Override + @Segment(name = "findUncompletedCommands", category = "application", library = "kamon") public List<Command> findUncompletedCommands(String globalTxId) { return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name()); } @Transactional @Override + @Segment(name = "findFirstCommandToCompensate", category = "application", library = "kamon") public List<Command> findFirstCommandToCompensate() { List<Command> commands = commandRepository .findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index 951e533..94ace76 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -26,6 +26,10 @@ import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; import org.springframework.data.domain.PageRequest; +import kamon.annotation.EnableKamon; +import kamon.annotation.Segment; + +@EnableKamon class SpringTxEventRepository implements TxEventRepository { private static final PageRequest SINGLE_TX_EVENT_REQUEST = new PageRequest(0, 1); private final TxEventEnvelopeRepository eventRepo; @@ -35,36 +39,43 @@ class SpringTxEventRepository implements TxEventRepository { } @Override + @Segment(name = "TxEventSave", category = "application", library = "kamon") public void save(TxEvent event) { eventRepo.save(event); } @Override + @Segment(name = "findFirstAbortedGloableTransaction", category = "application", library = "kamon") public Optional<List<TxEvent>> findFirstAbortedGlobalTransaction() { return eventRepo.findFirstAbortedGlobalTxByType(); } @Override + @Segment(name = "findTimeoutEvents", category = "application", library = "kamon") public List<TxEvent> findTimeoutEvents() { return eventRepo.findTimeoutEvents(SINGLE_TX_EVENT_REQUEST); } @Override + @Segment(name = "findTxStartedEvent", category = "application", library = "kamon") public Optional<TxEvent> findTxStartedEvent(String globalTxId, String localTxId) { return eventRepo.findFirstStartedEventByGlobalTxIdAndLocalTxId(globalTxId, localTxId); } @Override + @Segment(name = "findTransactions", category = "application", library = "kamon") public List<TxEvent> findTransactions(String globalTxId, String type) { return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type); } @Override + @Segment(name = "findFirstUncompensatedEventByIdGreaterThan", category = "application", library = "kamon") public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) { return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, SINGLE_TX_EVENT_REQUEST); } @Override + @Segment(name = "findFirstCompensatedEventByIdGreaterThan", category = "application", library = "kamon") public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id) { return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(TxCompensatedEvent.name(), id); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java index 53b4443..6b756b5 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java @@ -30,6 +30,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.domain.PageRequest; +import kamon.annotation.EnableKamon; +import kamon.annotation.Segment; + +@EnableKamon public class SpringTxTimeoutRepository implements TxTimeoutRepository { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -40,6 +44,7 @@ public class SpringTxTimeoutRepository implements TxTimeoutRepository { } @Override + @Segment(name = "TxTimeoutEventSave", category = "application", library = "kamon") public void save(TxTimeout timeout) { try { timeoutRepo.save(timeout); @@ -49,16 +54,18 @@ public class SpringTxTimeoutRepository implements TxTimeoutRepository { } @Override + @Segment(name = "markTimeoutAsDone", category = "application", library = "kamon") public void markTimeoutAsDone() { timeoutRepo.updateStatusOfFinishedTx(); } @Transactional @Override + @Segment(name = "findFirstTimeout", category = "application", library = "kamon") public List<TxTimeout> findFirstTimeout() { List<TxTimeout> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1)); timeoutEvents.forEach(event -> timeoutRepo .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId())); return timeoutEvents; } -} \ No newline at end of file +}