This is an automated email from the ASF dual-hosted git repository.

zfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a0d485fc support ipv6 (#786)
a0d485fc is described below

commit a0d485fc6a73457c08ddd56b63f5821e09688772
Author: lxfeng1997 <33981743+lxfeng1...@users.noreply.github.com>
AuthorDate: Sat May 10 19:03:04 2025 +0800

    support ipv6 (#786)
    
    * support ipv6
    
    * add license
    
    * support ipv6
    
    * support ipv6
    
    ---------
    
    Co-authored-by: JayLiu <38887641+luky...@users.noreply.github.com>
    Co-authored-by: FengZhang <zfc...@qq.com>
---
 pkg/discovery/etcd3.go                             |  19 ++-
 pkg/discovery/etcd3_test.go                        |  18 +++
 pkg/discovery/file.go                              |  13 +-
 pkg/discovery/file_test.go                         |  71 ++++++++++-
 pkg/remoting/getty/session_manager.go              |   3 +-
 pkg/remoting/loadbalance/xid_loadbalance.go        |  48 +++++---
 pkg/remoting/loadbalance/xid_loadbalance_test.go   |   6 +
 .../net/address_validator.go}                      |  59 +++++-----
 pkg/util/net/address_validator_test.go             | 131 +++++++++++++++++++++
 9 files changed, 299 insertions(+), 69 deletions(-)

diff --git a/pkg/discovery/etcd3.go b/pkg/discovery/etcd3.go
index 14ec3777..49ff35be 100644
--- a/pkg/discovery/etcd3.go
+++ b/pkg/discovery/etcd3.go
@@ -20,11 +20,13 @@ package discovery
 import (
        "context"
        "fmt"
-       etcd3 "go.etcd.io/etcd/client/v3"
-       "seata.apache.org/seata-go/pkg/util/log"
-       "strconv"
        "strings"
        "sync"
+
+       etcd3 "go.etcd.io/etcd/client/v3"
+
+       "seata.apache.org/seata-go/pkg/util/log"
+       "seata.apache.org/seata-go/pkg/util/net"
 )
 
 const (
@@ -189,12 +191,7 @@ func getClusterName(key []byte) (string, error) {
 
 func getServerInstance(value []byte) (*ServiceInstance, error) {
        stringValue := string(value)
-       valueSplit := strings.Split(stringValue, addressSplitChar)
-       if len(valueSplit) != 2 {
-               return nil, fmt.Errorf("etcd value has an incorrect format. 
value: %s", stringValue)
-       }
-       ip := valueSplit[0]
-       port, err := strconv.Atoi(valueSplit[1])
+       ip, port, err := net.SplitIPPortStr(stringValue)
        if err != nil {
                return nil, fmt.Errorf("etcd port has an incorrect format. err: 
%w", err)
        }
@@ -213,9 +210,7 @@ func getClusterAndAddress(key []byte) (string, string, int, 
error) {
                return "", "", 0, fmt.Errorf("etcd key has an incorrect format. 
key: %s", stringKey)
        }
        cluster := keySplit[2]
-       address := strings.Split(keySplit[3], addressSplitChar)
-       ip := address[0]
-       port, err := strconv.Atoi(address[1])
+       ip, port, err := net.SplitIPPortStr(keySplit[3])
        if err != nil {
                return "", "", 0, fmt.Errorf("etcd port has an incorrect 
format. err: %w", err)
        }
diff --git a/pkg/discovery/etcd3_test.go b/pkg/discovery/etcd3_test.go
index 7c9c59fb..81f87c6d 100644
--- a/pkg/discovery/etcd3_test.go
+++ b/pkg/discovery/etcd3_test.go
@@ -54,6 +54,24 @@ func TestEtcd3RegistryService_Lookup(t *testing.T) {
                                },
                        },
                },
+               {
+                       name: "host is ipv6",
+                       getResp: &clientv3.GetResponse{
+                               Kvs: []*mvccpb.KeyValue{
+                                       {
+                                               Key:   
[]byte("registry-seata-default-2000:0000:0000:0000:0001:2345:6789:abcd:8091"),
+                                               Value: 
[]byte("2000:0000:0000:0000:0001:2345:6789:abcd:8091"),
+                                       },
+                               },
+                       },
+                       watchResp: nil,
+                       want: []*ServiceInstance{
+                               {
+                                       Addr: 
"2000:0000:0000:0000:0001:2345:6789:abcd",
+                                       Port: 8091,
+                               },
+                       },
+               },
                {
                        name:    "use watch update ServiceInstances",
                        getResp: nil,
diff --git a/pkg/discovery/file.go b/pkg/discovery/file.go
index abc71a37..3be8b560 100644
--- a/pkg/discovery/file.go
+++ b/pkg/discovery/file.go
@@ -19,15 +19,14 @@ package discovery
 
 import (
        "fmt"
-       "strconv"
        "strings"
 
        "seata.apache.org/seata-go/pkg/util/log"
+       "seata.apache.org/seata-go/pkg/util/net"
 )
 
 const (
        endPointSplitChar = ";"
-       ipPortSplitChar   = ":"
 )
 
 type FileRegistryService struct {
@@ -66,17 +65,13 @@ func (s *FileRegistryService) Lookup(key string) 
([]*ServiceInstance, error) {
        addrs := strings.Split(addrStr, endPointSplitChar)
        instances := make([]*ServiceInstance, 0)
        for _, addr := range addrs {
-               ipPort := strings.Split(addr, ipPortSplitChar)
-               if len(ipPort) != 2 {
-                       return nil, fmt.Errorf("endpoint format should like 
ip:port. endpoint: %s", addr)
-               }
-               ip := ipPort[0]
-               port, err := strconv.Atoi(ipPort[1])
+               host, port, err := net.SplitIPPortStr(addr)
                if err != nil {
+                       log.Errorf("endpoint err. endpoint: %s", addr)
                        return nil, err
                }
                instances = append(instances, &ServiceInstance{
-                       Addr: ip,
+                       Addr: host,
                        Port: port,
                })
        }
diff --git a/pkg/discovery/file_test.go b/pkg/discovery/file_test.go
index 7b3bc7b5..9d653c13 100644
--- a/pkg/discovery/file_test.go
+++ b/pkg/discovery/file_test.go
@@ -136,7 +136,7 @@ func TestFileRegistryService_Lookup(t *testing.T) {
                        },
                        want:       nil,
                        wantErr:    true,
-                       wantErrMsg: "endpoint format should like ip:port. 
endpoint: 127.0.0.18091",
+                       wantErrMsg: "address 127.0.0.18091: missing port in 
address",
                },
                {
                        name: "port is not number",
@@ -157,6 +157,75 @@ func TestFileRegistryService_Lookup(t *testing.T) {
                        wantErr:    true,
                        wantErrMsg: "strconv.Atoi: parsing \"abc\": invalid 
syntax",
                },
+               {
+                       name: "endpoint is ipv6",
+                       args: args{
+                               key: "default_tx_group",
+                       },
+                       fields: fields{
+                               serviceConfig: &ServiceConfig{
+                                       VgroupMapping: map[string]string{
+                                               "default_tx_group": "default",
+                                       },
+                                       Grouplist: map[string]string{
+                                               "default": 
"[2000:0000:0000:0000:0001:2345:6789:abcd]:8080",
+                                       },
+                               },
+                       },
+                       want: []*ServiceInstance{
+                               {
+                                       Addr: 
"2000:0000:0000:0000:0001:2345:6789:abcd",
+                                       Port: 8080,
+                               },
+                       },
+                       wantErr: false,
+               },
+               {
+                       name: "endpoint is ipv6",
+                       args: args{
+                               key: "default_tx_group",
+                       },
+                       fields: fields{
+                               serviceConfig: &ServiceConfig{
+                                       VgroupMapping: map[string]string{
+                                               "default_tx_group": "default",
+                                       },
+                                       Grouplist: map[string]string{
+                                               "default": 
"[2000:0000:0000:0000:0001:2345:6789:abcd%10]:8080",
+                                       },
+                               },
+                       },
+                       want: []*ServiceInstance{
+                               {
+                                       Addr: 
"2000:0000:0000:0000:0001:2345:6789:abcd",
+                                       Port: 8080,
+                               },
+                       },
+                       wantErr: false,
+               },
+               {
+                       name: "endpoint is ipv6",
+                       args: args{
+                               key: "default_tx_group",
+                       },
+                       fields: fields{
+                               serviceConfig: &ServiceConfig{
+                                       VgroupMapping: map[string]string{
+                                               "default_tx_group": "default",
+                                       },
+                                       Grouplist: map[string]string{
+                                               "default": "[::]:8080",
+                                       },
+                               },
+                       },
+                       want: []*ServiceInstance{
+                               {
+                                       Addr: "::",
+                                       Port: 8080,
+                               },
+                       },
+                       wantErr: false,
+               },
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
diff --git a/pkg/remoting/getty/session_manager.go 
b/pkg/remoting/getty/session_manager.go
index 602fffed..5cab5805 100644
--- a/pkg/remoting/getty/session_manager.go
+++ b/pkg/remoting/getty/session_manager.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "net"
        "reflect"
+       "strconv"
        "sync"
        "sync/atomic"
        "time"
@@ -76,7 +77,7 @@ func (g *SessionManager) init() {
        }
        for _, address := range addressList {
                gettyClient := getty.NewTCPClient(
-                       getty.WithServerAddress(fmt.Sprintf("%s:%d", 
address.Addr, address.Port)),
+                       getty.WithServerAddress(net.JoinHostPort(address.Addr, 
strconv.Itoa(address.Port))),
                        // todo if read c.gettyConf.ConnectionNum, will cause 
the connect to fail
                        getty.WithConnectionNumber(1),
                        
getty.WithReconnectInterval(g.gettyConf.ReconnectInterval),
diff --git a/pkg/remoting/loadbalance/xid_loadbalance.go 
b/pkg/remoting/loadbalance/xid_loadbalance.go
index 0d18fb8b..3900832f 100644
--- a/pkg/remoting/loadbalance/xid_loadbalance.go
+++ b/pkg/remoting/loadbalance/xid_loadbalance.go
@@ -18,34 +18,46 @@
 package loadbalance
 
 import (
+       "fmt"
        "strings"
        "sync"
 
        getty "github.com/apache/dubbo-getty"
+
+       "seata.apache.org/seata-go/pkg/util/log"
+       "seata.apache.org/seata-go/pkg/util/net"
 )
 
 func XidLoadBalance(sessions *sync.Map, xid string) getty.Session {
        var session getty.Session
+       const delimiter = ":"
+
+       if len(xid) > 0 && strings.Contains(xid, delimiter) {
+               // ip:port:transactionId -> ip:port
+               index := strings.LastIndex(xid, delimiter)
+               serverAddress := xid[:index]
 
-       // ip:port:transactionId
-       tmpSplits := strings.Split(xid, ":")
-       if len(tmpSplits) == 3 {
-               ip := tmpSplits[0]
-               port := tmpSplits[1]
-               ipPort := ip + ":" + port
-               sessions.Range(func(key, value interface{}) bool {
-                       tmpSession := key.(getty.Session)
-                       if tmpSession.IsClosed() {
-                               sessions.Delete(tmpSession)
+               // ip:port -> port
+               // ipv4/v6
+               ip, port, err := net.SplitIPPortStr(serverAddress)
+               if err != nil {
+                       log.Errorf("xid load balance err, xid:%s, %v , change 
use random load balance", xid, err)
+               } else {
+                       sessions.Range(func(key, value interface{}) bool {
+                               tmpSession := key.(getty.Session)
+                               if tmpSession.IsClosed() {
+                                       sessions.Delete(tmpSession)
+                                       return true
+                               }
+                               ipPort := fmt.Sprintf("%s:%d", ip, port)
+                               connectedIpPort := tmpSession.RemoteAddr()
+                               if ipPort == connectedIpPort {
+                                       session = tmpSession
+                                       return false
+                               }
                                return true
-                       }
-                       connectedIpPort := tmpSession.RemoteAddr()
-                       if ipPort == connectedIpPort {
-                               session = tmpSession
-                               return false
-                       }
-                       return true
-               })
+                       })
+               }
        }
 
        if session == nil {
diff --git a/pkg/remoting/loadbalance/xid_loadbalance_test.go 
b/pkg/remoting/loadbalance/xid_loadbalance_test.go
index db9d58bc..160e12e5 100644
--- a/pkg/remoting/loadbalance/xid_loadbalance_test.go
+++ b/pkg/remoting/loadbalance/xid_loadbalance_test.go
@@ -84,6 +84,12 @@ func TestXidLoadBalance(t *testing.T) {
                        xid:         "127.0.0.1:9000:111",
                        returnAddrs: []string{"127.0.0.1:8000", 
"127.0.0.1:8002"},
                },
+               {
+                       name:        "ip is ipv6",
+                       sessions:    sessions,
+                       xid:         
"2000:0000:0000:0000:0001:2345:6789:abcd:8002:111",
+                       returnAddrs: []string{"127.0.0.1:8000", 
"127.0.0.1:8002"},
+               },
        }
        for _, test := range testCases {
                session := XidLoadBalance(test.sessions, test.xid)
diff --git a/pkg/remoting/loadbalance/xid_loadbalance.go 
b/pkg/util/net/address_validator.go
similarity index 53%
copy from pkg/remoting/loadbalance/xid_loadbalance.go
copy to pkg/util/net/address_validator.go
index 0d18fb8b..03280cbd 100644
--- a/pkg/remoting/loadbalance/xid_loadbalance.go
+++ b/pkg/util/net/address_validator.go
@@ -15,42 +15,45 @@
  * limitations under the License.
  */
 
-package loadbalance
+package net
 
 import (
+       "fmt"
+       "regexp"
+       "strconv"
        "strings"
-       "sync"
+)
 
-       getty "github.com/apache/dubbo-getty"
+const (
+       addressSplitChar = ":"
 )
 
-func XidLoadBalance(sessions *sync.Map, xid string) getty.Session {
-       var session getty.Session
-
-       // ip:port:transactionId
-       tmpSplits := strings.Split(xid, ":")
-       if len(tmpSplits) == 3 {
-               ip := tmpSplits[0]
-               port := tmpSplits[1]
-               ipPort := ip + ":" + port
-               sessions.Range(func(key, value interface{}) bool {
-                       tmpSession := key.(getty.Session)
-                       if tmpSession.IsClosed() {
-                               sessions.Delete(tmpSession)
-                               return true
-                       }
-                       connectedIpPort := tmpSession.RemoteAddr()
-                       if ipPort == connectedIpPort {
-                               session = tmpSession
-                               return false
-                       }
-                       return true
-               })
+func SplitIPPortStr(addr string) (string, int, error) {
+       if addr == "" {
+               return "", 0, fmt.Errorf("split ip err: param addr must not 
empty")
+       }
+
+       if addr[0] == '[' {
+               reg := regexp.MustCompile("[\\[\\]]")
+               addr = reg.ReplaceAllString(addr, "")
        }
 
-       if session == nil {
-               return RandomLoadBalance(sessions, xid)
+       i := strings.LastIndex(addr, addressSplitChar)
+       if i < 0 {
+               return "", 0, fmt.Errorf("address %s: missing port in address", 
addr)
        }
 
-       return session
+       host := addr[:i]
+       port := addr[i+1:]
+
+       if strings.Contains(host, "%") {
+               reg := regexp.MustCompile("\\%[0-9]+")
+               host = reg.ReplaceAllString(host, "")
+       }
+
+       portInt, err := strconv.Atoi(port)
+       if err != nil {
+               return "", 0, err
+       }
+       return host, portInt, nil
 }
diff --git a/pkg/util/net/address_validator_test.go 
b/pkg/util/net/address_validator_test.go
new file mode 100644
index 00000000..fd3658eb
--- /dev/null
+++ b/pkg/util/net/address_validator_test.go
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net
+
+import (
+       "reflect"
+       "testing"
+)
+
+func TestAddressValidator(t *testing.T) {
+       type IPAddr struct {
+               host string
+               port int
+       }
+       tests := []struct {
+               name       string
+               address    string
+               want       *IPAddr
+               wantErr    bool
+               wantErrMsg string
+       }{
+               {
+                       name:    "normal single endpoint.",
+                       address: "127.0.0.1:8091",
+                       want: &IPAddr{
+                               "127.0.0.1", 8091,
+                       },
+                       wantErr: false,
+               },
+               {
+                       name:       "addr is empty.",
+                       address:    "",
+                       want:       nil,
+                       wantErr:    true,
+                       wantErrMsg: "split ip err: param addr must not empty",
+               },
+               {
+                       name:       "format is not ip:port",
+                       address:    "127.0.0.18091",
+                       want:       nil,
+                       wantErr:    true,
+                       wantErrMsg: "address 127.0.0.18091: missing port in 
address",
+               },
+               {
+                       name:       "port is not number",
+                       address:    "127.0.0.1:abc",
+                       want:       nil,
+                       wantErr:    true,
+                       wantErrMsg: "strconv.Atoi: parsing \"abc\": invalid 
syntax",
+               },
+               {
+                       name:    "endpoint is ipv6",
+                       address: 
"[2000:0000:0000:0000:0001:2345:6789:abcd]:8080",
+                       want: &IPAddr{
+                               "2000:0000:0000:0000:0001:2345:6789:abcd", 8080,
+                       },
+                       wantErr: false,
+               },
+               {
+                       name:    "endpoint is ipv6",
+                       address: 
"[2000:0000:0000:0000:0001:2345:6789:abcd%10]:8080",
+                       want: &IPAddr{
+                               "2000:0000:0000:0000:0001:2345:6789:abcd", 8080,
+                       },
+                       wantErr: false,
+               },
+               {
+                       name:    "endpoint is ipv6",
+                       address: "2000:0000:0000:0000:0001:2345:6789:abcd:8080",
+                       want: &IPAddr{
+                               "2000:0000:0000:0000:0001:2345:6789:abcd", 8080,
+                       },
+                       wantErr: false,
+               },
+               {
+                       name:    "endpoint is ipv6",
+                       address: "[::]:8080",
+                       want: &IPAddr{
+                               "::", 8080,
+                       },
+                       wantErr: false,
+               },
+               {
+                       name:    "endpoint is ipv6",
+                       address: "::FFFF:192.168.1.2:8080",
+                       want: &IPAddr{
+                               "::FFFF:192.168.1.2", 8080,
+                       },
+                       wantErr: false,
+               },
+               {
+                       name:    "endpoint is ipv6",
+                       address: "[::FFFF:192.168.1.2]:8080",
+                       want: &IPAddr{
+                               "::FFFF:192.168.1.2", 8080,
+                       },
+                       wantErr: false,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       host, port, err := SplitIPPortStr(tt.address)
+
+                       if (err != nil) != tt.wantErr {
+                               t.Errorf("SplitIPPortStr() error = %v, wantErr 
= %v", err, tt.wantErr)
+                       }
+                       if tt.wantErr && err.Error() != tt.wantErrMsg {
+                               t.Errorf("SplitIPPortStr() errMsg = %v, 
wantErrMsg = %v", err.Error(), tt.wantErrMsg)
+                       }
+                       got := &IPAddr{host, port}
+                       if !tt.wantErr && !reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("SplitIPPortStr() got = %v, want = 
%v", got, tt.want)
+                       }
+               })
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to