This is an automated email from the ASF dual-hosted git repository.

zfeng pushed a commit to branch develop-tmp
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/develop-tmp by this push:
     new 08cdcf21 the ability to automatically run unit tests after creating a 
pull request.  (#764)
08cdcf21 is described below

commit 08cdcf2166bc198c4824ab37793c4c4fe595640b
Author: Aster Zephyr <2046084...@qq.com>
AuthorDate: Sat Jun 21 18:57:08 2025 +0800

    the ability to automatically run unit tests after creating a pull request.  
(#764)
    
    * feat: add unit test workflow
    
    * feat:the ability to automatically run unit tests after creating a pull 
request.
    
    * feat:the ability to automatically run unit tests after creating a pull 
request.
    
    * feat:the ability to automatically run unit tests after creating a pull 
request.
    
    * feat:the ability to automatically run unit tests after creating a pull 
request.
    
    * feat:the ability to automatically run unit tests after creating a pull 
request.
    
    * feat:the ability to automatically run unit tests after creating a pull 
request.
    
    * Optimize/at build lock key performance  (#837)
    
    * Refer to buildlockkey2 optimization #829
    
    * Time complexity O(NM)-> O(NK) about buildlockkey and buildlockkey2  
Increased readability  #829
    
    * update import sort #829
    
    * update Encapsulation into util packages #829
    
    * Support Update join (#761)
    
    * duplicate image row for update join
    
    * update join condition placeholder param error
    
    * update join bugfix
    
    * Open test annotations
    
    * recover update executor
    
    * recover update test
    
    * recover update test
    
    * modified version param
    
    ---------
    
    Co-authored-by: JayLiu <38887641+luky...@users.noreply.github.com>
    Co-authored-by: FengZhang <zfc...@qq.com>
    
    ---------
    
    Co-authored-by: jimin <sliev...@163.com>
    Co-authored-by: JayLiu <38887641+luky...@users.noreply.github.com>
    Co-authored-by: FengZhang <zfc...@qq.com>
    Co-authored-by: Wiggins <125641755+minat...@users.noreply.github.com>
    Co-authored-by: lxfeng1997 <33981743+lxfeng1...@users.noreply.github.com>
---
 .github/workflows/build.yml                        |   8 +-
 .github/workflows/unit-test.yml                    |  78 +++++
 pkg/datasource/sql/exec/at/base_executor.go        | 125 ++++++--
 .../at/base_executor_test.go}                      | 104 +++---
 pkg/datasource/sql/exec/at/update_executor.go      |   8 +-
 pkg/datasource/sql/exec/at/update_join_executor.go | 348 +++++++++++++++++++++
 .../sql/exec/at/update_join_executor_test.go       | 231 ++++++++++++++
 .../sql/undo/builder/basic_undo_log_builder.go     |  46 +--
 .../undo/builder/basic_undo_log_builder_test.go    |  19 ++
 pkg/datasource/sql/util/lockkey.go                 |  75 +++++
 10 files changed, 922 insertions(+), 120 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index d0f7b1f9..55b56d39 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -17,7 +17,7 @@
 
 # This is a workflow to help you test the unit case and show codecov
 
-name: "build and codecov"
+name: "build"
 
 on:
   push:
@@ -60,9 +60,3 @@ jobs:
 
       - name: "run go build"
         run: go build -v ./...
-
-      - name: "run go test and out codecov"
-        run: go test -v ./... -race -coverprofile=coverage.out 
-covermode=atomic
-
-      - name: "upload coverage"
-        uses: codecov/codecov-action@v3
diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml
new file mode 100644
index 00000000..388e1cf3
--- /dev/null
+++ b/.github/workflows/unit-test.yml
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: "Unit Test"
+
+on:
+  push:
+    branches: [ master ]
+  pull_request:
+    branches: [ "*" ]
+    types: [opened, synchronize, reopened]
+
+permissions:
+  contents: read
+
+jobs:
+  unit-test:
+    name: Unit Test
+    runs-on: ubuntu-latest
+    timeout-minutes: 10
+    strategy:
+      matrix:
+        golang:
+          - 1.18
+
+    steps:
+    - name: "Set up Go"
+      uses: actions/setup-go@v3
+      with:
+        go-version: ${{ matrix.golang }}
+
+    - name: "Checkout code"
+      uses: actions/checkout@v3
+      with:
+        submodules: true
+
+    - name: "Cache dependencies"
+      uses: actions/cache@v3
+      with:
+        path: ~/go/pkg/mod
+        key: "${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}"
+        restore-keys: |
+          "${{ runner.os }}-go-"
+
+    - name: Shutdown default mysql
+      run: sudo service mysql stop
+
+
+    - name: "Run Unit Tests"
+      run: |
+        echo "=== Starting Unit Tests ==="
+        go test -v ./... -race -coverprofile=coverage.txt -covermode=atomic 
-timeout 10m
+        if [ $? -ne 0 ]; then
+          echo "❌ Unit tests failed"
+          exit 1
+        fi
+        echo "✅ Unit tests completed successfully"
+
+    - name: "Archive test results"
+      uses: actions/upload-artifact@v3
+      with:
+        name: test-results
+        path: coverage.txt
+        retention-days: 7
diff --git a/pkg/datasource/sql/exec/at/base_executor.go 
b/pkg/datasource/sql/exec/at/base_executor.go
index 05f44057..76f3e424 100644
--- a/pkg/datasource/sql/exec/at/base_executor.go
+++ b/pkg/datasource/sql/exec/at/base_executor.go
@@ -18,20 +18,21 @@
 package at
 
 import (
-       "bytes"
        "context"
        "database/sql"
        "database/sql/driver"
        "fmt"
-       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
        "strings"
 
        "github.com/arana-db/parser/ast"
+       "github.com/arana-db/parser/model"
        "github.com/arana-db/parser/test_driver"
        gxsort "github.com/dubbogo/gost/sort"
+       "github.com/pkg/errors"
 
        "seata.apache.org/seata-go/pkg/datasource/sql/exec"
        "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
        "seata.apache.org/seata-go/pkg/datasource/sql/util"
        "seata.apache.org/seata-go/pkg/util/reflectx"
 )
@@ -98,7 +99,13 @@ func (b *baseExecutor) buildSelectArgs(stmt *ast.SelectStmt, 
args []driver.Named
                selectArgs       = make([]driver.NamedValue, 0)
        )
 
+       b.traversalArgs(stmt.From.TableRefs, &selectArgsIndexs)
        b.traversalArgs(stmt.Where, &selectArgsIndexs)
+       if stmt.GroupBy != nil {
+               for _, item := range stmt.GroupBy.Items {
+                       b.traversalArgs(item, &selectArgsIndexs)
+               }
+       }
        if stmt.OrderBy != nil {
                for _, item := range stmt.OrderBy.Items {
                        b.traversalArgs(item, &selectArgsIndexs)
@@ -143,6 +150,16 @@ func (b *baseExecutor) traversalArgs(node ast.Node, 
argsIndex *[]int32) {
                        b.traversalArgs(exprs[i], argsIndex)
                }
                break
+       case *ast.Join:
+               exprs := node.(*ast.Join)
+               b.traversalArgs(exprs.Left, argsIndex)
+               if exprs.Right != nil {
+                       b.traversalArgs(exprs.Right, argsIndex)
+               }
+               if exprs.On != nil {
+                       b.traversalArgs(exprs.On.Expr, argsIndex)
+               }
+               break
        case *test_driver.ParamMarkerExpr:
                *argsIndex = append(*argsIndex, 
int32(node.(*test_driver.ParamMarkerExpr).Order))
                break
@@ -230,6 +247,64 @@ func (b *baseExecutor) containsPKByName(meta 
*types.TableMeta, columns []string)
        return matchCounter == len(pkColumnNameList)
 }
 
+func (u *baseExecutor) buildSelectFields(ctx context.Context, tableMeta 
*types.TableMeta, tableAliases string, inUseFields []*ast.Assignment) 
([]*ast.SelectField, error) {
+       fields := make([]*ast.SelectField, 0, len(inUseFields))
+
+       tableName := tableAliases
+       if tableAliases == "" {
+               tableName = tableMeta.TableName
+       }
+       if undo.UndoConfig.OnlyCareUpdateColumns {
+               for _, column := range inUseFields {
+                       tn := column.Column.Table.O
+                       if tn != "" && tn != tableName {
+                               continue
+                       }
+
+                       fields = append(fields, &ast.SelectField{
+                               Expr: &ast.ColumnNameExpr{
+                                       Name: column.Column,
+                               },
+                       })
+               }
+
+               if len(fields) == 0 {
+                       return fields, nil
+               }
+
+               // select indexes columns
+               for _, columnName := range tableMeta.GetPrimaryKeyOnlyName() {
+                       fields = append(fields, &ast.SelectField{
+                               Expr: &ast.ColumnNameExpr{
+                                       Name: &ast.ColumnName{
+                                               Table: model.CIStr{
+                                                       O: tableName,
+                                                       L: tableName,
+                                               },
+                                               Name: model.CIStr{
+                                                       O: columnName,
+                                                       L: columnName,
+                                               },
+                                       },
+                               },
+                       })
+               }
+       } else {
+               fields = append(fields, &ast.SelectField{
+                       Expr: &ast.ColumnNameExpr{
+                               Name: &ast.ColumnName{
+                                       Name: model.CIStr{
+                                               O: "*",
+                                               L: "*",
+                                       },
+                               },
+                       },
+               })
+       }
+
+       return fields, nil
+}
+
 func getSqlNullValue(value interface{}) interface{} {
        if value == nil {
                return nil
@@ -359,37 +434,25 @@ func (b *baseExecutor) buildPKParams(rows 
[]types.RowImage, pkNameList []string)
 
 // the string as local key. the local key example(multi pk): "t_user:1_a,2_b"
 func (b *baseExecutor) buildLockKey(records *types.RecordImage, meta 
types.TableMeta) string {
-       var (
-               lockKeys      bytes.Buffer
-               filedSequence int
-       )
-       lockKeys.WriteString(meta.TableName)
-       lockKeys.WriteString(":")
+       return util.BuildLockKey(records, meta)
+}
 
-       keys := meta.GetPrimaryKeyOnlyName()
+func (b *baseExecutor) rowsPrepare(ctx context.Context, conn driver.Conn, 
selectSQL string, selectArgs []driver.NamedValue) (driver.Rows, error) {
+       var queryer driver.Queryer
 
-       for _, row := range records.Rows {
-               if filedSequence > 0 {
-                       lockKeys.WriteString(",")
-               }
-               pkSplitIndex := 0
-               for _, column := range row.Columns {
-                       var hasKeyColumn bool
-                       for _, key := range keys {
-                               if column.ColumnName == key {
-                                       hasKeyColumn = true
-                                       if pkSplitIndex > 0 {
-                                               lockKeys.WriteString("_")
-                                       }
-                                       lockKeys.WriteString(fmt.Sprintf("%v", 
column.Value))
-                                       pkSplitIndex++
-                               }
-                       }
-                       if hasKeyColumn {
-                               filedSequence++
-                       }
-               }
+       queryerContext, ok := conn.(driver.QueryerContext)
+       if !ok {
+               queryer, ok = conn.(driver.Queryer)
        }
+       if ok {
+               var err error
+               rows, err = util.CtxDriverQuery(ctx, queryerContext, queryer, 
selectSQL, selectArgs)
 
-       return lockKeys.String()
+               if err != nil {
+                       return nil, err
+               }
+       } else {
+               return nil, errors.New("target conn should been 
driver.QueryerContext or driver.Queryer")
+       }
+       return rows, nil
 }
diff --git a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go 
b/pkg/datasource/sql/exec/at/base_executor_test.go
similarity index 63%
copy from pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go
copy to pkg/datasource/sql/exec/at/base_executor_test.go
index 465bf516..0caffe4e 100644
--- a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go
+++ b/pkg/datasource/sql/exec/at/base_executor_test.go
@@ -15,42 +15,16 @@
  * limitations under the License.
  */
 
-package builder
+package at
 
 import (
-       "testing"
-
        "github.com/stretchr/testify/assert"
-
        "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "testing"
 )
 
-func TestBuildWhereConditionByPKs(t *testing.T) {
-       builder := BasicUndoLogBuilder{}
-       tests := []struct {
-               name       string
-               pkNameList []string
-               rowSize    int
-               maxInSize  int
-               expectSQL  string
-       }{
-               {"test1", []string{"id", "name"}, 1, 1, "(`id`,`name`) IN 
((?,?))"},
-               {"test1", []string{"id", "name"}, 3, 2, "(`id`,`name`) IN 
((?,?),(?,?)) OR (`id`,`name`) IN ((?,?))"},
-               {"test1", []string{"id", "name"}, 3, 1, "(`id`,`name`) IN 
((?,?)) OR (`id`,`name`) IN ((?,?)) OR (`id`,`name`) IN ((?,?))"},
-               {"test1", []string{"id", "name"}, 4, 2, "(`id`,`name`) IN 
((?,?),(?,?)) OR (`id`,`name`) IN ((?,?),(?,?))"},
-       }
-
-       for _, test := range tests {
-               t.Run(test.name, func(t *testing.T) {
-                       // todo add dbType param
-                       sql := 
builder.buildWhereConditionByPKs(test.pkNameList, test.rowSize, "", 
test.maxInSize)
-                       assert.Equal(t, test.expectSQL, sql)
-               })
-       }
-}
-
-func TestBuildLockKey(t *testing.T) {
-       var builder BasicUndoLogBuilder
+func TestBaseExecBuildLockKey(t *testing.T) {
+       var exec baseExecutor
 
        columnID := types.ColumnMeta{
                ColumnName: "id",
@@ -69,6 +43,7 @@ func TestBuildLockKey(t *testing.T) {
        }
 
        columnsTwoPk := []types.ColumnMeta{columnID, columnUserId}
+       columnsThreePk := []types.ColumnMeta{columnID, columnUserId, columnAge}
        columnsMixPk := []types.ColumnMeta{columnName, columnAge}
 
        getColumnImage := func(columnName string, value interface{}) 
types.ColumnImage {
@@ -92,11 +67,29 @@ func TestBuildLockKey(t *testing.T) {
                        types.RecordImage{
                                TableName: "test_name",
                                Rows: []types.RowImage{
-                                       
{[]types.ColumnImage{getColumnImage("id", 1), getColumnImage("userId", "one")}},
-                                       
{[]types.ColumnImage{getColumnImage("id", 2), getColumnImage("userId", "two")}},
+                                       
{[]types.ColumnImage{getColumnImage("id", 1), getColumnImage("userId", 
"user1")}},
+                                       
{[]types.ColumnImage{getColumnImage("id", 2), getColumnImage("userId", 
"user2")}},
+                               },
+                       },
+                       "test_name:1_user1,2_user2",
+               },
+               {
+                       "Three Primary Keys",
+                       types.TableMeta{
+                               TableName: "test2_name",
+                               Indexs: map[string]types.IndexMeta{
+                                       "PRIMARY_KEY": {IType: 
types.IndexTypePrimaryKey, Columns: columnsThreePk},
+                               },
+                       },
+                       types.RecordImage{
+                               TableName: "test2_name",
+                               Rows: []types.RowImage{
+                                       
{[]types.ColumnImage{getColumnImage("id", 1), getColumnImage("userId", "one"), 
getColumnImage("age", "11")}},
+                                       
{[]types.ColumnImage{getColumnImage("id", 2), getColumnImage("userId", "two"), 
getColumnImage("age", "22")}},
+                                       
{[]types.ColumnImage{getColumnImage("id", 3), getColumnImage("userId", 
"three"), getColumnImage("age", "33")}},
                                },
                        },
-                       "test_name:1_one,2_two",
+                       "test2_name:1_one_11,2_two_22,3_three_33",
                },
                {
                        name: "Single Primary Key",
@@ -125,10 +118,10 @@ func TestBuildLockKey(t *testing.T) {
                        records: types.RecordImage{
                                TableName: "mixed_key",
                                Rows: []types.RowImage{
-                                       {Columns: 
[]types.ColumnImage{getColumnImage("name", "Alice"), getColumnImage("age", 
25)}},
+                                       {Columns: 
[]types.ColumnImage{getColumnImage("name", "mike"), getColumnImage("age", 25)}},
                                },
                        },
-                       expected: "mixed_key:Alice_25",
+                       expected: "mixed_key:mike_25",
                },
                {
                        name: "Empty Records",
@@ -152,10 +145,10 @@ func TestBuildLockKey(t *testing.T) {
                        records: types.RecordImage{
                                TableName: "special",
                                Rows: []types.RowImage{
-                                       {Columns: 
[]types.ColumnImage{getColumnImage("id", "a,b_c")}},
+                                       {Columns: 
[]types.ColumnImage{getColumnImage("id", "A,b_c")}},
                                },
                        },
-                       expected: "special:a,b_c",
+                       expected: "special:A,b_c",
                },
                {
                        name: "Non-existent Key Name",
@@ -173,11 +166,46 @@ func TestBuildLockKey(t *testing.T) {
                        },
                        expected: "error_key:",
                },
+               {
+                       name: "Multiple Rows With Nil PK Value",
+                       metaData: types.TableMeta{
+                               TableName: "nil_pk",
+                               Indexs: map[string]types.IndexMeta{
+                                       "PRIMARY_KEY": {IType: 
types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnID}},
+                               },
+                       },
+                       records: types.RecordImage{
+                               TableName: "nil_pk",
+                               Rows: []types.RowImage{
+                                       {Columns: 
[]types.ColumnImage{getColumnImage("id", nil)}},
+                                       {Columns: 
[]types.ColumnImage{getColumnImage("id", 123)}},
+                                       {Columns: 
[]types.ColumnImage{getColumnImage("id", nil)}},
+                               },
+                       },
+                       expected: "nil_pk:,123,",
+               },
+               {
+                       name: "PK As Bool And Float",
+                       metaData: types.TableMeta{
+                               TableName: "type_pk",
+                               Indexs: map[string]types.IndexMeta{
+                                       "PRIMARY_KEY": {IType: 
types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnName, columnAge}},
+                               },
+                       },
+                       records: types.RecordImage{
+                               TableName: "type_pk",
+                               Rows: []types.RowImage{
+                                       {Columns: 
[]types.ColumnImage{getColumnImage("name", true), getColumnImage("age", 3.14)}},
+                                       {Columns: 
[]types.ColumnImage{getColumnImage("name", false), getColumnImage("age", 0.0)}},
+                               },
+                       },
+                       expected: "type_pk:true_3.14,false_0",
+               },
        }
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       lockKeys := builder.buildLockKey2(&tt.records, 
tt.metaData)
+                       lockKeys := exec.buildLockKey(&tt.records, tt.metaData)
                        assert.Equal(t, tt.expected, lockKeys)
                })
        }
diff --git a/pkg/datasource/sql/exec/at/update_executor.go 
b/pkg/datasource/sql/exec/at/update_executor.go
index 0ac9b049..0f14e97b 100644
--- a/pkg/datasource/sql/exec/at/update_executor.go
+++ b/pkg/datasource/sql/exec/at/update_executor.go
@@ -21,17 +21,17 @@ import (
        "context"
        "database/sql/driver"
        "fmt"
+       "github.com/arana-db/parser/model"
+       "seata.apache.org/seata-go/pkg/datasource/sql/util"
        "strings"
 
        "github.com/arana-db/parser/ast"
        "github.com/arana-db/parser/format"
-       "github.com/arana-db/parser/model"
 
        "seata.apache.org/seata-go/pkg/datasource/sql/datasource"
        "seata.apache.org/seata-go/pkg/datasource/sql/exec"
        "seata.apache.org/seata-go/pkg/datasource/sql/types"
        "seata.apache.org/seata-go/pkg/datasource/sql/undo"
-       "seata.apache.org/seata-go/pkg/datasource/sql/util"
        "seata.apache.org/seata-go/pkg/util/bytes"
        "seata.apache.org/seata-go/pkg/util/log"
 )
@@ -49,6 +49,10 @@ type updateExecutor struct {
 
 // NewUpdateExecutor get update executor
 func NewUpdateExecutor(parserCtx *types.ParseContext, execContent 
*types.ExecContext, hooks []exec.SQLHook) executor {
+       // Because update join cannot be clearly identified when SQL cannot be 
parsed
+       if parserCtx.UpdateStmt.TableRefs.TableRefs.Right != nil {
+               return NewUpdateJoinExecutor(parserCtx, execContent, hooks)
+       }
        return &updateExecutor{parserCtx: parserCtx, execContext: execContent, 
baseExecutor: baseExecutor{hooks: hooks}}
 }
 
diff --git a/pkg/datasource/sql/exec/at/update_join_executor.go 
b/pkg/datasource/sql/exec/at/update_join_executor.go
new file mode 100644
index 00000000..d61a0ac3
--- /dev/null
+++ b/pkg/datasource/sql/exec/at/update_join_executor.go
@@ -0,0 +1,348 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package at
+
+import (
+       "context"
+       "database/sql/driver"
+       "errors"
+       "io"
+       "reflect"
+       "strings"
+
+       "github.com/arana-db/parser/ast"
+       "github.com/arana-db/parser/format"
+       "github.com/arana-db/parser/model"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/datasource"
+       "seata.apache.org/seata-go/pkg/datasource/sql/exec"
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/util"
+       "seata.apache.org/seata-go/pkg/util/bytes"
+       "seata.apache.org/seata-go/pkg/util/log"
+)
+
+const (
+       LowerSupportGroupByPksVersion = "5.7.5"
+)
+
+// updateJoinExecutor execute update SQL
+type updateJoinExecutor struct {
+       baseExecutor
+       parserCtx                       *types.ParseContext
+       execContext                     *types.ExecContext
+       isLowerSupportGroupByPksVersion bool
+       sqlMode                         string
+       tableAliasesMap                 map[string]string
+}
+
+// NewUpdateJoinExecutor get executor
+func NewUpdateJoinExecutor(parserCtx *types.ParseContext, execContent 
*types.ExecContext, hooks []exec.SQLHook) executor {
+       minimumVersion, _ := 
util.ConvertDbVersion(LowerSupportGroupByPksVersion)
+       currentVersion, _ := util.ConvertDbVersion(execContent.DbVersion)
+       return &updateJoinExecutor{
+               parserCtx:                       parserCtx,
+               execContext:                     execContent,
+               baseExecutor:                    baseExecutor{hooks: hooks},
+               isLowerSupportGroupByPksVersion: currentVersion < 
minimumVersion,
+               tableAliasesMap:                 make(map[string]string, 0),
+       }
+}
+
+// ExecContext exec SQL, and generate before image and after image
+func (u *updateJoinExecutor) ExecContext(ctx context.Context, f 
exec.CallbackWithNamedValue) (types.ExecResult, error) {
+       u.beforeHooks(ctx, u.execContext)
+       defer func() {
+               u.afterHooks(ctx, u.execContext)
+       }()
+
+       if u.isAstStmtValid() {
+               u.tableAliasesMap = 
u.parseTableName(u.parserCtx.UpdateStmt.TableRefs.TableRefs)
+       }
+
+       beforeImages, err := u.beforeImage(ctx)
+       if err != nil {
+               return nil, err
+       }
+
+       res, err := f(ctx, u.execContext.Query, u.execContext.NamedValues)
+       if err != nil {
+               return nil, err
+       }
+
+       afterImages, err := u.afterImage(ctx, beforeImages)
+       if err != nil {
+               return nil, err
+       }
+
+       if len(afterImages) != len(beforeImages) {
+               return nil, errors.New("Before image size is not equaled to 
after image size, probably because you updated the primary keys.")
+       }
+
+       u.execContext.TxCtx.RoundImages.AppendBeofreImages(beforeImages)
+       u.execContext.TxCtx.RoundImages.AppendAfterImages(afterImages)
+
+       return res, nil
+}
+
+func (u *updateJoinExecutor) isAstStmtValid() bool {
+       return u.parserCtx != nil && u.parserCtx.UpdateStmt != nil && 
u.parserCtx.UpdateStmt.TableRefs.TableRefs.Right != nil
+}
+
+func (u *updateJoinExecutor) beforeImage(ctx context.Context) 
([]*types.RecordImage, error) {
+       if !u.isAstStmtValid() {
+               return nil, nil
+       }
+
+       var recordImages []*types.RecordImage
+
+       for tbName, tableAliases := range u.tableAliasesMap {
+               metaData, err := 
datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, 
u.execContext.DBName, tbName)
+               if err != nil {
+                       return nil, err
+               }
+               selectSQL, selectArgs, err := u.buildBeforeImageSQL(ctx, 
metaData, tableAliases, u.execContext.NamedValues)
+               if err != nil {
+                       return nil, err
+               }
+               if selectSQL == "" {
+                       log.Debugf("Skip unused table [{%s}] when build select 
sql by update sourceQuery", tbName)
+                       continue
+               }
+
+               var image *types.RecordImage
+               rowsi, err := u.rowsPrepare(ctx, u.execContext.Conn, selectSQL, 
selectArgs)
+               if err == nil {
+                       image, err = u.buildRecordImages(rowsi, metaData, 
types.SQLTypeUpdate)
+               }
+               if rowsi != nil {
+                       if rowerr := rows.Close(); rowerr != nil {
+                               log.Errorf("rows close fail, err:%v", rowerr)
+                               return nil, rowerr
+                       }
+               }
+               if err != nil {
+                       // If one fail, all fails
+                       return nil, err
+               }
+
+               lockKey := u.buildLockKey(image, *metaData)
+               u.execContext.TxCtx.LockKeys[lockKey] = struct{}{}
+               image.SQLType = u.parserCtx.SQLType
+
+               recordImages = append(recordImages, image)
+       }
+
+       return recordImages, nil
+}
+
+func (u *updateJoinExecutor) afterImage(ctx context.Context, beforeImages 
[]*types.RecordImage) ([]*types.RecordImage, error) {
+       if !u.isAstStmtValid() {
+               return nil, nil
+       }
+
+       if len(beforeImages) == 0 {
+               return nil, errors.New("empty beforeImages")
+       }
+
+       var recordImages []*types.RecordImage
+       for _, beforeImage := range beforeImages {
+               metaData, err := 
datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, 
u.execContext.DBName, beforeImage.TableName)
+               if err != nil {
+                       return nil, err
+               }
+
+               selectSQL, selectArgs, err := u.buildAfterImageSQL(ctx, 
*beforeImage, metaData, u.tableAliasesMap[beforeImage.TableName])
+               if err != nil {
+                       return nil, err
+               }
+
+               var image *types.RecordImage
+               rowsi, err := u.rowsPrepare(ctx, u.execContext.Conn, selectSQL, 
selectArgs)
+               if err == nil {
+                       image, err = u.buildRecordImages(rowsi, metaData, 
types.SQLTypeUpdate)
+               }
+               if rowsi != nil {
+                       if rowerr := rowsi.Close(); rowerr != nil {
+                               log.Errorf("rows close fail, err:%v", rowerr)
+                               return nil, rowerr
+                       }
+               }
+               if err != nil {
+                       // If one fail, all fails
+                       return nil, err
+               }
+
+               image.SQLType = u.parserCtx.SQLType
+               recordImages = append(recordImages, image)
+       }
+
+       return recordImages, nil
+}
+
+// buildAfterImageSQL build the SQL to query before image data
+func (u *updateJoinExecutor) buildBeforeImageSQL(ctx context.Context, 
tableMeta *types.TableMeta, tableAliases string, args []driver.NamedValue) 
(string, []driver.NamedValue, error) {
+       updateStmt := u.parserCtx.UpdateStmt
+       fields, err := u.buildSelectFields(ctx, tableMeta, tableAliases, 
updateStmt.List)
+       if err != nil {
+               return "", nil, err
+       }
+       if len(fields) == 0 {
+               return "", nil, err
+       }
+
+       selStmt := ast.SelectStmt{
+               SelectStmtOpts: &ast.SelectStmtOpts{},
+               From:           updateStmt.TableRefs,
+               Where:          updateStmt.Where,
+               Fields:         &ast.FieldList{Fields: fields},
+               OrderBy:        updateStmt.Order,
+               Limit:          updateStmt.Limit,
+               TableHints:     updateStmt.TableHints,
+               // maybe duplicate row for select join sql.remove duplicate row 
by 'group by' condition
+               GroupBy: &ast.GroupByClause{
+                       Items: u.buildGroupByClause(ctx, tableMeta.TableName, 
tableAliases, tableMeta.GetPrimaryKeyOnlyName(), fields),
+               },
+               LockInfo: &ast.SelectLockInfo{
+                       LockType: ast.SelectLockForUpdate,
+               },
+       }
+
+       b := bytes.NewByteBuffer([]byte{})
+       _ = 
selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, b))
+       sql := string(b.Bytes())
+       log.Infof("build select sql by update sourceQuery, sql {%s}", sql)
+
+       return sql, u.buildSelectArgs(&selStmt, args), nil
+}
+
+func (u *updateJoinExecutor) buildAfterImageSQL(ctx context.Context, 
beforeImage types.RecordImage, meta *types.TableMeta, tableAliases string) 
(string, []driver.NamedValue, error) {
+       if len(beforeImage.Rows) == 0 {
+               return "", nil, nil
+       }
+
+       fields, err := u.buildSelectFields(ctx, meta, tableAliases, 
u.parserCtx.UpdateStmt.List)
+       if err != nil {
+               return "", nil, err
+       }
+       if len(fields) == 0 {
+               return "", nil, err
+       }
+
+       updateStmt := u.parserCtx.UpdateStmt
+       selStmt := ast.SelectStmt{
+               SelectStmtOpts: &ast.SelectStmtOpts{},
+               From:           updateStmt.TableRefs,
+               Where:          updateStmt.Where,
+               Fields:         &ast.FieldList{Fields: fields},
+               OrderBy:        updateStmt.Order,
+               Limit:          updateStmt.Limit,
+               TableHints:     updateStmt.TableHints,
+               // maybe duplicate row for select join sql.remove duplicate row 
by 'group by' condition
+               GroupBy: &ast.GroupByClause{
+                       Items: u.buildGroupByClause(ctx, meta.TableName, 
tableAliases, meta.GetPrimaryKeyOnlyName(), fields),
+               },
+       }
+
+       b := bytes.NewByteBuffer([]byte{})
+       _ = 
selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, b))
+       sql := string(b.Bytes())
+       log.Infof("build select sql by update sourceQuery, sql {%s}", sql)
+
+       return sql, u.buildPKParams(beforeImage.Rows, 
meta.GetPrimaryKeyOnlyName()), nil
+}
+
+func (u *updateJoinExecutor) parseTableName(joinMate *ast.Join) 
map[string]string {
+       tableNames := make(map[string]string, 0)
+       if item, ok := joinMate.Left.(*ast.Join); ok {
+               tableNames = u.parseTableName(item)
+       } else {
+               leftTableSource := joinMate.Left.(*ast.TableSource)
+               leftName := leftTableSource.Source.(*ast.TableName)
+               tableNames[leftName.Name.O] = leftTableSource.AsName.O
+       }
+
+       rightTableSource := joinMate.Right.(*ast.TableSource)
+       rightName := rightTableSource.Source.(*ast.TableName)
+       tableNames[rightName.Name.O] = rightTableSource.AsName.O
+       return tableNames
+}
+
+// build group by condition which used for removing duplicate row in select 
join sql
+func (u *updateJoinExecutor) buildGroupByClause(ctx context.Context, tableName 
string, tableAliases string, pkColumns []string, allSelectColumns 
[]*ast.SelectField) []*ast.ByItem {
+       var groupByPks = true
+       if tableAliases != "" {
+               tableName = tableAliases
+       }
+       //only pks group by is valid when db version >= 5.7.5
+       if u.isLowerSupportGroupByPksVersion {
+               if u.sqlMode == "" {
+                       rowsi, err := u.rowsPrepare(ctx, u.execContext.Conn, 
"SELECT @@SQL_MODE", nil)
+                       defer func() {
+                               if rowsi != nil {
+                                       if rowerr := rowsi.Close(); rowerr != 
nil {
+                                               log.Errorf("rows close fail, 
err:%v", rowerr)
+                                       }
+                               }
+                       }()
+                       if err != nil {
+                               groupByPks = false
+                               log.Warnf("determine group by pks or all 
columns error:%s", err)
+                       } else {
+                               // getString("@@SQL_MODE")
+                               mode := make([]driver.Value, 1)
+                               if err = rowsi.Next(mode); err != nil {
+                                       if err != io.EOF && len(mode) == 1 {
+                                               u.sqlMode = 
reflect.ValueOf(mode[0]).String()
+                                       }
+                               }
+                       }
+               }
+
+               if strings.Contains(u.sqlMode, "ONLY_FULL_GROUP_BY") {
+                       groupByPks = false
+               }
+       }
+
+       groupByColumns := make([]*ast.ByItem, 0)
+       if groupByPks {
+               for _, column := range pkColumns {
+                       groupByColumns = append(groupByColumns, &ast.ByItem{
+                               Expr: &ast.ColumnNameExpr{
+                                       Name: &ast.ColumnName{
+                                               Table: model.CIStr{
+                                                       O: tableName,
+                                                       L: 
strings.ToLower(tableName),
+                                               },
+                                               Name: model.CIStr{
+                                                       O: column,
+                                                       L: 
strings.ToLower(column),
+                                               },
+                                       },
+                               },
+                       })
+               }
+       } else {
+               for _, column := range allSelectColumns {
+                       groupByColumns = append(groupByColumns, &ast.ByItem{
+                               Expr: column.Expr,
+                       })
+               }
+       }
+       return groupByColumns
+}
diff --git a/pkg/datasource/sql/exec/at/update_join_executor_test.go 
b/pkg/datasource/sql/exec/at/update_join_executor_test.go
new file mode 100644
index 00000000..0ef30da3
--- /dev/null
+++ b/pkg/datasource/sql/exec/at/update_join_executor_test.go
@@ -0,0 +1,231 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package at
+
+import (
+       "context"
+       "database/sql/driver"
+       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/exec"
+       "seata.apache.org/seata-go/pkg/datasource/sql/parser"
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/util"
+       _ "seata.apache.org/seata-go/pkg/util/log"
+)
+
+func TestBuildSelectSQLByUpdateJoin(t *testing.T) {
+       MetaDataMap := map[string]*types.TableMeta{
+               "table1": {
+                       TableName: "table1",
+                       Indexs: map[string]types.IndexMeta{
+                               "id": {
+                                       IType: types.IndexTypePrimaryKey,
+                                       Columns: []types.ColumnMeta{
+                                               {ColumnName: "id"},
+                                       },
+                               },
+                       },
+                       Columns: map[string]types.ColumnMeta{
+                               "id": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "id",
+                               },
+                               "name": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "name",
+                               },
+                               "age": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "age",
+                               },
+                       },
+                       ColumnNames: []string{"id", "name", "age"},
+               },
+               "table2": {
+                       TableName: "table2",
+                       Indexs: map[string]types.IndexMeta{
+                               "id": {
+                                       IType: types.IndexTypePrimaryKey,
+                                       Columns: []types.ColumnMeta{
+                                               {ColumnName: "id"},
+                                       },
+                               },
+                       },
+                       Columns: map[string]types.ColumnMeta{
+                               "id": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "id",
+                               },
+                               "name": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "name",
+                               },
+                               "age": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "age",
+                               },
+                               "kk": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "kk",
+                               },
+                               "addr": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "addr",
+                               },
+                       },
+                       ColumnNames: []string{"id", "name", "age", "kk", 
"addr"},
+               },
+               "table3": {
+                       TableName: "table3",
+                       Indexs: map[string]types.IndexMeta{
+                               "id": {
+                                       IType: types.IndexTypePrimaryKey,
+                                       Columns: []types.ColumnMeta{
+                                               {ColumnName: "id"},
+                                       },
+                               },
+                       },
+                       Columns: map[string]types.ColumnMeta{
+                               "id": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "id",
+                               },
+                               "age": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "age",
+                               },
+                       },
+                       ColumnNames: []string{"id", "age"},
+               },
+               "table4": {
+                       TableName: "table4",
+                       Indexs: map[string]types.IndexMeta{
+                               "id": {
+                                       IType: types.IndexTypePrimaryKey,
+                                       Columns: []types.ColumnMeta{
+                                               {ColumnName: "id"},
+                                       },
+                               },
+                       },
+                       Columns: map[string]types.ColumnMeta{
+                               "id": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "id",
+                               },
+                               "age": {
+                                       ColumnDef:  nil,
+                                       ColumnName: "age",
+                               },
+                       },
+                       ColumnNames: []string{"id", "age"},
+               },
+       }
+
+       undo.InitUndoConfig(undo.Config{OnlyCareUpdateColumns: true})
+
+       tests := []struct {
+               name            string
+               sourceQuery     string
+               sourceQueryArgs []driver.Value
+               expectQuery     map[string]string
+               expectQueryArgs []driver.Value
+       }{
+               {
+                       sourceQuery:     "update table1 t1 left join table2 t2 
on t1.id = t2.id and t1.age=? set t1.name = 'WILL',t2.name = ?",
+                       sourceQueryArgs: []driver.Value{18, "Jack"},
+                       expectQuery: map[string]string{
+                               "table1": "SELECT SQL_NO_CACHE t1.name,t1.id 
FROM table1 AS t1 LEFT JOIN table2 AS t2 ON t1.id=t2.id AND t1.age=? GROUP BY 
t1.name,t1.id FOR UPDATE",
+                               "table2": "SELECT SQL_NO_CACHE t2.name,t2.id 
FROM table1 AS t1 LEFT JOIN table2 AS t2 ON t1.id=t2.id AND t1.age=? GROUP BY 
t2.name,t2.id FOR UPDATE",
+                       },
+                       expectQueryArgs: []driver.Value{18},
+               },
+               {
+                       sourceQuery:     "update table1 AS t1 inner join table2 
AS t2 on t1.id = t2.id set t1.name = 'WILL',t2.name = 'WILL' where t1.id=?",
+                       sourceQueryArgs: []driver.Value{1},
+                       expectQuery: map[string]string{
+                               "table1": "SELECT SQL_NO_CACHE t1.name,t1.id 
FROM table1 AS t1 JOIN table2 AS t2 ON t1.id=t2.id WHERE t1.id=? GROUP BY 
t1.name,t1.id FOR UPDATE",
+                               "table2": "SELECT SQL_NO_CACHE t2.name,t2.id 
FROM table1 AS t1 JOIN table2 AS t2 ON t1.id=t2.id WHERE t1.id=? GROUP BY 
t2.name,t2.id FOR UPDATE",
+                       },
+                       expectQueryArgs: []driver.Value{1},
+               },
+               {
+                       sourceQuery:     "update table1 AS t1 right join table2 
AS t2 on t1.id = t2.id set t1.name = 'WILL',t2.name = 'WILL' where t1.id=?",
+                       sourceQueryArgs: []driver.Value{1},
+                       expectQuery: map[string]string{
+                               "table1": "SELECT SQL_NO_CACHE t1.name,t1.id 
FROM table1 AS t1 RIGHT JOIN table2 AS t2 ON t1.id=t2.id WHERE t1.id=? GROUP BY 
t1.name,t1.id FOR UPDATE",
+                               "table2": "SELECT SQL_NO_CACHE t2.name,t2.id 
FROM table1 AS t1 RIGHT JOIN table2 AS t2 ON t1.id=t2.id WHERE t1.id=? GROUP BY 
t2.name,t2.id FOR UPDATE",
+                       },
+                       expectQueryArgs: []driver.Value{1},
+               },
+               {
+                       sourceQuery:     "update table1 t1 inner join table2 t2 
on t1.id = t2.id set t1.name = ?, t1.age = ? where t1.id = ? and t1.name = ? 
and t2.age between ? and ?",
+                       sourceQueryArgs: []driver.Value{"newJack", 38, 1, 
"Jack", 18, 28},
+                       expectQuery: map[string]string{
+                               "table1": "SELECT SQL_NO_CACHE 
t1.name,t1.age,t1.id FROM table1 AS t1 JOIN table2 AS t2 ON t1.id=t2.id WHERE 
t1.id=? AND t1.name=? AND t2.age BETWEEN ? AND ? GROUP BY t1.name,t1.age,t1.id 
FOR UPDATE",
+                       },
+                       expectQueryArgs: []driver.Value{1, "Jack", 18, 28},
+               },
+               {
+                       sourceQuery:     "update table1 t1 left join table2 t2 
on t1.id = t2.id set t1.name = ?, t1.age = ? where t1.id=? and t2.id is null 
and t1.age IN (?,?)",
+                       sourceQueryArgs: []driver.Value{"newJack", 38, 1, 18, 
28},
+                       expectQuery: map[string]string{
+                               "table1": "SELECT SQL_NO_CACHE 
t1.name,t1.age,t1.id FROM table1 AS t1 LEFT JOIN table2 AS t2 ON t1.id=t2.id 
WHERE t1.id=? AND t2.id IS NULL AND t1.age IN (?,?) GROUP BY 
t1.name,t1.age,t1.id FOR UPDATE",
+                       },
+                       expectQueryArgs: []driver.Value{1, 18, 28},
+               },
+               {
+                       sourceQuery:     "update table1 t1 inner join table2 t2 
on t1.id = t2.id set t1.name = ?, t2.age = ? where t2.kk between ? and ? and 
t2.addr in(?,?) and t2.age > ? order by t1.name desc limit ?",
+                       sourceQueryArgs: []driver.Value{"Jack", 18, 10, 20, 
"Beijing", "Guangzhou", 18, 2},
+                       expectQuery: map[string]string{
+                               "table1": "SELECT SQL_NO_CACHE t1.name,t1.id 
FROM table1 AS t1 JOIN table2 AS t2 ON t1.id=t2.id WHERE t2.kk BETWEEN ? AND ? 
AND t2.addr IN (?,?) AND t2.age>? GROUP BY t1.name,t1.id ORDER BY t1.name DESC 
LIMIT ? FOR UPDATE",
+                               "table2": "SELECT SQL_NO_CACHE t2.age,t2.id 
FROM table1 AS t1 JOIN table2 AS t2 ON t1.id=t2.id WHERE t2.kk BETWEEN ? AND ? 
AND t2.addr IN (?,?) AND t2.age>? GROUP BY t2.age,t2.id ORDER BY t1.name DESC 
LIMIT ? FOR UPDATE",
+                       },
+                       expectQueryArgs: []driver.Value{10, 20, "Beijing", 
"Guangzhou", 18, 2},
+               },
+               {
+                       sourceQuery:     "update table1 t1 left join table2 t2 
on t1.id = t2.id inner join table3 t3 on t3.id = t2.id right join table4 t4 on 
t4.id = t2.id set t1.name = ?,t2.name = ? where t1.id=? and t3.age=? and 
t4.age>30",
+                       sourceQueryArgs: []driver.Value{"Jack", "WILL", 1, 10},
+                       expectQuery: map[string]string{
+                               "table1": "SELECT SQL_NO_CACHE t1.name,t1.id 
FROM ((table1 AS t1 LEFT JOIN table2 AS t2 ON t1.id=t2.id) JOIN table3 AS t3 ON 
t3.id=t2.id) RIGHT JOIN table4 AS t4 ON t4.id=t2.id WHERE t1.id=? AND t3.age=? 
AND t4.age>30 GROUP BY t1.name,t1.id FOR UPDATE",
+                               "table2": "SELECT SQL_NO_CACHE t2.name,t2.id 
FROM ((table1 AS t1 LEFT JOIN table2 AS t2 ON t1.id=t2.id) JOIN table3 AS t3 ON 
t3.id=t2.id) RIGHT JOIN table4 AS t4 ON t4.id=t2.id WHERE t1.id=? AND t3.age=? 
AND t4.age>30 GROUP BY t2.name,t2.id FOR UPDATE",
+                       },
+                       expectQueryArgs: []driver.Value{1, 10},
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       c, err := parser.DoParser(tt.sourceQuery)
+                       assert.Nil(t, err)
+                       executor := NewUpdateJoinExecutor(c, 
&types.ExecContext{Values: tt.sourceQueryArgs, NamedValues: 
util.ValueToNamedValue(tt.sourceQueryArgs)}, []exec.SQLHook{})
+                       tableNames := 
executor.(*updateJoinExecutor).parseTableName(c.UpdateStmt.TableRefs.TableRefs)
+                       for tbName, tableAliases := range tableNames {
+                               query, args, err := 
executor.(*updateJoinExecutor).buildBeforeImageSQL(context.Background(), 
MetaDataMap[tbName], tableAliases, util.ValueToNamedValue(tt.sourceQueryArgs))
+                               assert.Nil(t, err)
+                               if query == "" {
+                                       continue
+                               }
+                               assert.Equal(t, tt.expectQuery[tbName], query)
+                               assert.Equal(t, tt.expectQueryArgs, 
util.NamedValueToValue(args))
+                       }
+               })
+       }
+}
diff --git a/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go 
b/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go
index 8c5f20f2..d02604b8 100644
--- a/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go
+++ b/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go
@@ -22,12 +22,12 @@ import (
        "database/sql"
        "database/sql/driver"
        "fmt"
-       "io"
-       "strings"
-
        "github.com/arana-db/parser/ast"
        "github.com/arana-db/parser/test_driver"
        gxsort "github.com/dubbogo/gost/sort"
+       "io"
+       "seata.apache.org/seata-go/pkg/datasource/sql/util"
+       "strings"
 
        "seata.apache.org/seata-go/pkg/datasource/sql/types"
 )
@@ -276,43 +276,5 @@ func (b *BasicUndoLogBuilder) buildLockKey(rows 
driver.Rows, meta types.TableMet
 
 // the string as local key. the local key example(multi pk): "t_user:1_a,2_b"
 func (b *BasicUndoLogBuilder) buildLockKey2(records *types.RecordImage, meta 
types.TableMeta) string {
-       var lockKeys bytes.Buffer
-       lockKeys.WriteString(meta.TableName)
-       lockKeys.WriteString(":")
-
-       keys := meta.GetPrimaryKeyOnlyName()
-       keyIndexMap := make(map[string]int, len(keys))
-
-       for idx, columnName := range keys {
-               keyIndexMap[columnName] = idx
-       }
-
-       primaryKeyRows := make([][]interface{}, len(records.Rows))
-
-       for i, row := range records.Rows {
-               primaryKeyValues := make([]interface{}, len(keys))
-               for _, column := range row.Columns {
-                       if idx, exist := keyIndexMap[column.ColumnName]; exist {
-                               primaryKeyValues[idx] = column.Value
-                       }
-               }
-               primaryKeyRows[i] = primaryKeyValues
-       }
-
-       for i, primaryKeyValues := range primaryKeyRows {
-               if i > 0 {
-                       lockKeys.WriteString(",")
-               }
-               for j, pkVal := range primaryKeyValues {
-                       if j > 0 {
-                               lockKeys.WriteString("_")
-                       }
-                       if pkVal == nil {
-                               continue
-                       }
-                       lockKeys.WriteString(fmt.Sprintf("%v", pkVal))
-               }
-       }
-
-       return lockKeys.String()
+       return util.BuildLockKey(records, meta)
 }
diff --git a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go 
b/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go
index 465bf516..744f725f 100644
--- a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go
+++ b/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go
@@ -69,6 +69,7 @@ func TestBuildLockKey(t *testing.T) {
        }
 
        columnsTwoPk := []types.ColumnMeta{columnID, columnUserId}
+       columnsThreePk := []types.ColumnMeta{columnID, columnUserId, columnAge}
        columnsMixPk := []types.ColumnMeta{columnName, columnAge}
 
        getColumnImage := func(columnName string, value interface{}) 
types.ColumnImage {
@@ -98,6 +99,24 @@ func TestBuildLockKey(t *testing.T) {
                        },
                        "test_name:1_one,2_two",
                },
+               {
+                       "Three Primary Keys",
+                       types.TableMeta{
+                               TableName: "test2_name",
+                               Indexs: map[string]types.IndexMeta{
+                                       "PRIMARY_KEY": {IType: 
types.IndexTypePrimaryKey, Columns: columnsThreePk},
+                               },
+                       },
+                       types.RecordImage{
+                               TableName: "test2_name",
+                               Rows: []types.RowImage{
+                                       
{[]types.ColumnImage{getColumnImage("id", 1), getColumnImage("userId", "one"), 
getColumnImage("age", "11")}},
+                                       
{[]types.ColumnImage{getColumnImage("id", 2), getColumnImage("userId", "two"), 
getColumnImage("age", "22")}},
+                                       
{[]types.ColumnImage{getColumnImage("id", 3), getColumnImage("userId", 
"three"), getColumnImage("age", "33")}},
+                               },
+                       },
+                       "test2_name:1_one_11,2_two_22,3_three_33",
+               },
                {
                        name: "Single Primary Key",
                        metaData: types.TableMeta{
diff --git a/pkg/datasource/sql/util/lockkey.go 
b/pkg/datasource/sql/util/lockkey.go
new file mode 100644
index 00000000..02992982
--- /dev/null
+++ b/pkg/datasource/sql/util/lockkey.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util
+
+import (
+       "fmt"
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "strings"
+)
+
+func BuildLockKey(records *types.RecordImage, meta types.TableMeta) string {
+       var lockKeys strings.Builder
+       type ColMapItem struct {
+               pkIndex  int
+               colIndex int
+       }
+
+       lockKeys.WriteString(meta.TableName)
+       lockKeys.WriteString(":")
+
+       keys := meta.GetPrimaryKeyOnlyName()
+       keyIndexMap := make(map[string]int, len(keys))
+       for idx, columnName := range keys {
+               keyIndexMap[columnName] = idx
+       }
+
+       columns := make([]ColMapItem, 0, len(keys))
+       if len(records.Rows) > 0 {
+               for colIdx, column := range records.Rows[0].Columns {
+                       if pkIdx, ok := keyIndexMap[column.ColumnName]; ok {
+                               columns = append(columns, ColMapItem{pkIndex: 
pkIdx, colIndex: colIdx})
+                       }
+               }
+               for i, row := range records.Rows {
+                       if i > 0 {
+                               lockKeys.WriteString(",")
+                       }
+                       primaryKeyValues := make([]interface{}, len(keys))
+                       for _, mp := range columns {
+                               if mp.colIndex < len(row.Columns) {
+                                       primaryKeyValues[mp.pkIndex] = 
row.Columns[mp.colIndex].Value
+                               }
+                       }
+                       for j, pkVal := range primaryKeyValues {
+                               if j > 0 {
+                                       lockKeys.WriteString("_")
+                               }
+                               if pkVal == nil {
+                                       continue
+                               }
+                               lockKeys.WriteString(fmt.Sprintf("%v", pkVal))
+                       }
+               }
+       }
+       return lockKeys.String()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org


Reply via email to