This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/develop by this push:
new bdf755d88c optimize: optimize transaction metrics (#7001)
bdf755d88c is described below
commit bdf755d88ca4ab01ba41c35c58a88691be7c42c1
Author: jimin <[email protected]>
AuthorDate: Sun Nov 17 13:12:50 2024 +0800
optimize: optimize transaction metrics (#7001)
---
changes/en-us/develop.md | 1 +
changes/zh-cn/develop.md | 2 +
.../io/seata/server/metrics/MeterIdConstants.java | 2 +-
.../io/seata/server/metrics/MetricsSubscriber.java | 160 ++++++++++-----------
.../io/seata/server/session/GlobalSession.java | 2 +-
.../io/seata/server/session/SessionHelper.java | 5 +-
.../server/session/SessionStatusValidator.java | 5 +
7 files changed, 86 insertions(+), 91 deletions(-)
diff --git a/changes/en-us/develop.md b/changes/en-us/develop.md
index abe5dcc99a..d720bf2c26 100644
--- a/changes/en-us/develop.md
+++ b/changes/en-us/develop.md
@@ -23,6 +23,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#6044](https://github.com/seata/seata/pull/6044)] optimize derivative
product check base on mysql
- [[#6361](https://github.com/seata/seata/pull/6361)] optimize 401 issues for
some links
- [[#6903](https://github.com/apache/incubator-seata/pull/6903)] optimize
`tableMeta` cache scheduled refresh issue
+- [[#7001](https://github.com/seata/seata/pull/7001)] optimize transaction
metrics
- [[#7002](https://github.com/apache/incubator-seata/pull/7002)] optimize lock
release logic in AT transaction mode
### security:
diff --git a/changes/zh-cn/develop.md b/changes/zh-cn/develop.md
index b18916ec8d..ea9eb7fa0f 100644
--- a/changes/zh-cn/develop.md
+++ b/changes/zh-cn/develop.md
@@ -23,8 +23,10 @@
- [[#6044](https://github.com/seata/seata/pull/6044)] 优化MySQL衍生数据库判断逻辑
- [[#6361](https://github.com/seata/seata/pull/6361)] 优化部分链接 401 的问题
- [[#6903](https://github.com/apache/incubator-seata/pull/6903)]
优化`tableMeta`缓存定时刷新问题
+- [[#7001](https://github.com/apache/incubator-seata/pull/7001)] 优化 metrics 指标
- [[#7002](https://github.com/apache/incubator-seata/pull/7002)] 优化 AT
事务模式锁释放逻辑
+
### security:
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述
diff --git a/server/src/main/java/io/seata/server/metrics/MeterIdConstants.java
b/server/src/main/java/io/seata/server/metrics/MeterIdConstants.java
index 29d55af24c..da19451550 100644
--- a/server/src/main/java/io/seata/server/metrics/MeterIdConstants.java
+++ b/server/src/main/java/io/seata/server/metrics/MeterIdConstants.java
@@ -85,7 +85,7 @@ public interface MeterIdConstants {
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
.withTag(IdConstants.STATUS_KEY, IdConstants.STATUS_VALUE_COMMITTED);
- Id TIMER_ROLLBACK = new Id(IdConstants.SEATA_TRANSACTION)
+ Id TIMER_ROLLBACKED = new Id(IdConstants.SEATA_TRANSACTION)
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
.withTag(IdConstants.STATUS_KEY, IdConstants.STATUS_VALUE_ROLLBACKED);
diff --git
a/server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java
b/server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java
index 4c48f802f1..9e477da596 100644
--- a/server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java
+++ b/server/src/main/java/io/seata/server/metrics/MetricsSubscriber.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
import com.google.common.eventbus.Subscribe;
import io.seata.core.event.GlobalTransactionEvent;
import io.seata.core.model.GlobalStatus;
+import io.seata.metrics.Id;
import io.seata.metrics.registry.Registry;
import io.seata.server.event.EventBusManager;
import org.slf4j.Logger;
@@ -47,21 +48,47 @@ public class MetricsSubscriber {
public MetricsSubscriber(Registry registry) {
this.registry = registry;
- consumers = new HashMap<>();
- consumers.put(GlobalStatus.Begin.name(),
this::processGlobalStatusBegin);
- consumers.put(GlobalStatus.Committed.name(),
this::processGlobalStatusCommitted);
- consumers.put(GlobalStatus.Rollbacked.name(),
this::processGlobalStatusRollbacked);
+ this.consumers = initializeConsumers();
+ }
+
+ private Map<String, Consumer<GlobalTransactionEvent>>
initializeConsumers() {
+ Map<String, Consumer<GlobalTransactionEvent>> consumerMap = new
HashMap<>();
+ consumerMap.put(GlobalStatus.Begin.name(),
this::processGlobalStatusBegin);
+ consumerMap.put(GlobalStatus.Committed.name(),
this::processGlobalStatusCommitted);
+ consumerMap.put(GlobalStatus.Rollbacked.name(),
this::processGlobalStatusRollbacked);
+
+ consumerMap.put(GlobalStatus.CommitFailed.name(),
this::processGlobalStatusCommitFailed);
+ consumerMap.put(GlobalStatus.RollbackFailed.name(),
this::processGlobalStatusRollbackFailed);
+ consumerMap.put(GlobalStatus.TimeoutRollbacked.name(),
this::processGlobalStatusTimeoutRollbacked);
+ consumerMap.put(GlobalStatus.TimeoutRollbackFailed.name(),
this::processGlobalStatusTimeoutRollbackFailed);
- consumers.put(GlobalStatus.CommitFailed.name(),
this::processGlobalStatusCommitFailed);
- consumers.put(GlobalStatus.RollbackFailed.name(),
this::processGlobalStatusRollbackFailed);
- consumers.put(GlobalStatus.TimeoutRollbacked.name(),
this::processGlobalStatusTimeoutRollbacked);
- consumers.put(GlobalStatus.TimeoutRollbackFailed.name(),
this::processGlobalStatusTimeoutRollbackFailed);
+ consumerMap.put(GlobalStatus.CommitRetryTimeout.name(),
this::processGlobalStatusCommitRetryTimeout);
+ consumerMap.put(GlobalStatus.RollbackRetryTimeout.name(),
this::processGlobalStatusTimeoutRollbackRetryTimeout);
- consumers.put(GlobalStatus.CommitRetryTimeout.name(),
this::processGlobalStatusCommitRetryTimeout);
- consumers.put(GlobalStatus.RollbackRetryTimeout.name(),
this::processGlobalStatusTimeoutRollbackRetryTimeout);
+ consumerMap.put(STATUS_VALUE_AFTER_COMMITTED_KEY,
this::processAfterGlobalCommitted);
+ consumerMap.put(STATUS_VALUE_AFTER_ROLLBACKED_KEY,
this::processAfterGlobalRollbacked);
+ return consumerMap;
+ }
+
+ private void increaseCounter(Id counterId, GlobalTransactionEvent event) {
+ registry.getCounter(
+ counterId.withTag(APP_ID_KEY,
event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).increase(1);
+ }
- consumers.put(STATUS_VALUE_AFTER_COMMITTED_KEY,
this::processAfterGlobalCommitted);
- consumers.put(STATUS_VALUE_AFTER_ROLLBACKED_KEY,
this::processAfterGlobalRollbacked);
+ private void decreaseCounter(Id counterId, GlobalTransactionEvent event) {
+ registry.getCounter(
+ counterId.withTag(APP_ID_KEY,
event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).decrease(1);
+ }
+
+ private void increaseSummary(Id summaryId, GlobalTransactionEvent event,
long value) {
+ registry.getSummary(
+ summaryId.withTag(APP_ID_KEY,
event.getApplicationId()).withTag(GROUP_KEY, event.getGroup())).increase(
+ value);
+ }
+
+ private void increaseTimer(Id timerId, GlobalTransactionEvent event) {
+ registry.getTimer(timerId.withTag(APP_ID_KEY,
event.getApplicationId()).withTag(GROUP_KEY, event.getGroup()))
+ .record(event.getEndTime() - event.getBeginTime(),
TimeUnit.MILLISECONDS);
}
private void processGlobalStatusBegin(GlobalTransactionEvent event) {
@@ -71,128 +98,86 @@ public class MetricsSubscriber {
LOGGER.debug("subscribe:{},threadName:{}", object.toString(),
Thread.currentThread().getName());
}
}
-
registry.getCounter(MeterIdConstants.COUNTER_ACTIVE.withTag(APP_ID_KEY,
event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
+ increaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}
private void processGlobalStatusCommitted(GlobalTransactionEvent event) {
if (event.isRetryGlobal()) {
return;
}
- decreaseActive(event);
- registry.getCounter(MeterIdConstants.COUNTER_COMMITTED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getSummary(MeterIdConstants.SUMMARY_COMMITTED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getTimer(MeterIdConstants.TIMER_COMMITTED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup()))
- .record(event.getEndTime() - event.getBeginTime(),
TimeUnit.MILLISECONDS);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
+ increaseCounter(MeterIdConstants.COUNTER_COMMITTED, event);
+ increaseSummary(MeterIdConstants.SUMMARY_COMMITTED, event, 1);
+ increaseTimer(MeterIdConstants.TIMER_COMMITTED, event);
}
private void processGlobalStatusRollbacked(GlobalTransactionEvent event) {
if (event.isRetryGlobal()) {
return;
}
- decreaseActive(event);
- registry.getCounter(MeterIdConstants.COUNTER_ROLLBACKED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getSummary(MeterIdConstants.SUMMARY_ROLLBACKED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getTimer(MeterIdConstants.TIMER_ROLLBACK
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup()))
- .record(event.getEndTime() - event.getBeginTime(),
TimeUnit.MILLISECONDS);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
+ increaseCounter(MeterIdConstants.COUNTER_ROLLBACKED, event);
+ increaseSummary(MeterIdConstants.SUMMARY_ROLLBACKED, event, 1);
+ increaseTimer(MeterIdConstants.TIMER_ROLLBACKED, event);
}
private void processAfterGlobalRollbacked(GlobalTransactionEvent event) {
if (event.isRetryGlobal() && event.isRetryBranch()) {
- decreaseActive(event);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}
- registry.getCounter(MeterIdConstants.COUNTER_AFTER_ROLLBACKED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getSummary(MeterIdConstants.SUMMARY_AFTER_ROLLBACKED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getTimer(MeterIdConstants.TIMER_AFTER_ROLLBACKED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup()))
- .record(event.getEndTime() - event.getBeginTime(),
TimeUnit.MILLISECONDS);
+ increaseCounter(MeterIdConstants.COUNTER_AFTER_ROLLBACKED, event);
+ increaseSummary(MeterIdConstants.SUMMARY_AFTER_ROLLBACKED, event, 1);
+ increaseTimer(MeterIdConstants.TIMER_AFTER_ROLLBACKED, event);
}
private void processAfterGlobalCommitted(GlobalTransactionEvent event) {
if (event.isRetryGlobal() && event.isRetryBranch()) {
- decreaseActive(event);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}
- registry.getCounter(MeterIdConstants.COUNTER_AFTER_COMMITTED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getSummary(MeterIdConstants.SUMMARY_AFTER_COMMITTED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getTimer(MeterIdConstants.TIMER_AFTER_COMMITTED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup()))
- .record(event.getEndTime() - event.getBeginTime(),
TimeUnit.MILLISECONDS);
+ increaseCounter(MeterIdConstants.COUNTER_AFTER_COMMITTED, event);
+ increaseSummary(MeterIdConstants.SUMMARY_AFTER_COMMITTED, event, 1);
+ increaseTimer(MeterIdConstants.TIMER_AFTER_COMMITTED, event);
}
private void processGlobalStatusCommitFailed(GlobalTransactionEvent event)
{
- decreaseActive(event);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
reportFailed(event);
}
private void processGlobalStatusRollbackFailed(GlobalTransactionEvent
event) {
- decreaseActive(event);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
reportFailed(event);
}
private void processGlobalStatusTimeoutRollbacked(GlobalTransactionEvent
event) {
- decreaseActive(event);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
}
private void
processGlobalStatusTimeoutRollbackFailed(GlobalTransactionEvent event) {
- decreaseActive(event);
- reportTwoPhaseTimeout(event);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
+ increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
+ reportFailed(event);
}
private void processGlobalStatusCommitRetryTimeout(GlobalTransactionEvent
event) {
- decreaseActive(event);
- reportTwoPhaseTimeout(event);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
+ increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
+ //The phase 2 retry timeout state should be considered a transaction
failed
+ reportFailed(event);
}
private void
processGlobalStatusTimeoutRollbackRetryTimeout(GlobalTransactionEvent event) {
- decreaseActive(event);
- }
-
- private void decreaseActive(GlobalTransactionEvent event) {
- registry.getCounter(MeterIdConstants.COUNTER_ACTIVE
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).decrease(1);
+ decreaseCounter(MeterIdConstants.COUNTER_ACTIVE, event);
+ increaseSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT, event, 1);
+ //The phase 2 retry timeout state should be considered a transaction
failed
+ reportFailed(event);
}
private void reportFailed(GlobalTransactionEvent event) {
- registry.getSummary(MeterIdConstants.SUMMARY_FAILED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
- registry.getTimer(MeterIdConstants.TIMER_FAILED
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup()))
- .record(event.getEndTime() - event.getBeginTime(),
TimeUnit.MILLISECONDS);
- }
-
- private void reportTwoPhaseTimeout(GlobalTransactionEvent event) {
- registry.getSummary(MeterIdConstants.SUMMARY_TWO_PHASE_TIMEOUT
- .withTag(APP_ID_KEY, event.getApplicationId())
- .withTag(GROUP_KEY, event.getGroup())).increase(1);
+ increaseSummary(MeterIdConstants.SUMMARY_FAILED, event, 1);
+ increaseTimer(MeterIdConstants.TIMER_FAILED, event);
}
-
-
@Subscribe
public void recordGlobalTransactionEventForMetrics(GlobalTransactionEvent
event) {
if (registry != null && consumers.containsKey(event.getStatus())) {
@@ -208,6 +193,7 @@ public class MetricsSubscriber {
/**
* PMD check
* SuppressWarnings("checkstyle:EqualsHashCode")
+ *
* @return the hash code
*/
@Override
diff --git a/server/src/main/java/io/seata/server/session/GlobalSession.java
b/server/src/main/java/io/seata/server/session/GlobalSession.java
index 29b71e8083..df2805515e 100644
--- a/server/src/main/java/io/seata/server/session/GlobalSession.java
+++ b/server/src/main/java/io/seata/server/session/GlobalSession.java
@@ -760,7 +760,7 @@ public class GlobalSession implements SessionLifecycle,
SessionStorable {
public void queueToRetryRollback() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager());
GlobalStatus currentStatus = this.getStatus();
- if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
+ if (GlobalStatus.TimeoutRollbacking == currentStatus) {
this.setStatus(GlobalStatus.TimeoutRollbackRetrying);
} else {
this.setStatus(GlobalStatus.RollbackRetrying);
diff --git a/server/src/main/java/io/seata/server/session/SessionHelper.java
b/server/src/main/java/io/seata/server/session/SessionHelper.java
index cf4d91bf63..d3d49464c8 100644
--- a/server/src/main/java/io/seata/server/session/SessionHelper.java
+++ b/server/src/main/java/io/seata/server/session/SessionHelper.java
@@ -193,7 +193,8 @@ public class SessionHelper {
}
boolean retryBranch =
currentStatus == GlobalStatus.TimeoutRollbackRetrying ||
currentStatus == GlobalStatus.RollbackRetrying;
- if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
+ if (!currentStatus.equals(GlobalStatus.TimeoutRollbacked)
+ && SessionStatusValidator.isTimeoutRollbacking(currentStatus))
{
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);
} else {
globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);
@@ -236,7 +237,7 @@ public class SessionHelper {
GlobalStatus currentStatus = globalSession.getStatus();
if (isRetryTimeout) {
globalSession.changeGlobalStatus(GlobalStatus.RollbackRetryTimeout);
- } else if
(SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
+ } else if (SessionStatusValidator.isTimeoutRollbacking(currentStatus))
{
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbackFailed);
} else {
globalSession.changeGlobalStatus(GlobalStatus.RollbackFailed);
diff --git
a/server/src/main/java/io/seata/server/session/SessionStatusValidator.java
b/server/src/main/java/io/seata/server/session/SessionStatusValidator.java
index 20323a7e7b..860f313af1 100644
--- a/server/src/main/java/io/seata/server/session/SessionStatusValidator.java
+++ b/server/src/main/java/io/seata/server/session/SessionStatusValidator.java
@@ -36,6 +36,11 @@ public class SessionStatusValidator {
|| status == GlobalStatus.TimeoutRollbackRetrying;
}
+ public static boolean isTimeoutRollbacking(GlobalStatus status) {
+ return status == GlobalStatus.TimeoutRollbacking
+ || status == GlobalStatus.TimeoutRollbackRetrying;
+ }
+
/**
* is rollback global status
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]