This is an automated email from the ASF dual-hosted git repository. zfeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/master by this push: new a0d485fc support ipv6 (#786) a0d485fc is described below commit a0d485fc6a73457c08ddd56b63f5821e09688772 Author: lxfeng1997 <33981743+lxfeng1...@users.noreply.github.com> AuthorDate: Sat May 10 19:03:04 2025 +0800 support ipv6 (#786) * support ipv6 * add license * support ipv6 * support ipv6 --------- Co-authored-by: JayLiu <38887641+luky...@users.noreply.github.com> Co-authored-by: FengZhang <zfc...@qq.com> --- pkg/discovery/etcd3.go | 19 ++- pkg/discovery/etcd3_test.go | 18 +++ pkg/discovery/file.go | 13 +- pkg/discovery/file_test.go | 71 ++++++++++- pkg/remoting/getty/session_manager.go | 3 +- pkg/remoting/loadbalance/xid_loadbalance.go | 48 +++++--- pkg/remoting/loadbalance/xid_loadbalance_test.go | 6 + .../net/address_validator.go} | 59 +++++----- pkg/util/net/address_validator_test.go | 131 +++++++++++++++++++++ 9 files changed, 299 insertions(+), 69 deletions(-) diff --git a/pkg/discovery/etcd3.go b/pkg/discovery/etcd3.go index 14ec3777..49ff35be 100644 --- a/pkg/discovery/etcd3.go +++ b/pkg/discovery/etcd3.go @@ -20,11 +20,13 @@ package discovery import ( "context" "fmt" - etcd3 "go.etcd.io/etcd/client/v3" - "seata.apache.org/seata-go/pkg/util/log" - "strconv" "strings" "sync" + + etcd3 "go.etcd.io/etcd/client/v3" + + "seata.apache.org/seata-go/pkg/util/log" + "seata.apache.org/seata-go/pkg/util/net" ) const ( @@ -189,12 +191,7 @@ func getClusterName(key []byte) (string, error) { func getServerInstance(value []byte) (*ServiceInstance, error) { stringValue := string(value) - valueSplit := strings.Split(stringValue, addressSplitChar) - if len(valueSplit) != 2 { - return nil, fmt.Errorf("etcd value has an incorrect format. value: %s", stringValue) - } - ip := valueSplit[0] - port, err := strconv.Atoi(valueSplit[1]) + ip, port, err := net.SplitIPPortStr(stringValue) if err != nil { return nil, fmt.Errorf("etcd port has an incorrect format. err: %w", err) } @@ -213,9 +210,7 @@ func getClusterAndAddress(key []byte) (string, string, int, error) { return "", "", 0, fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey) } cluster := keySplit[2] - address := strings.Split(keySplit[3], addressSplitChar) - ip := address[0] - port, err := strconv.Atoi(address[1]) + ip, port, err := net.SplitIPPortStr(keySplit[3]) if err != nil { return "", "", 0, fmt.Errorf("etcd port has an incorrect format. err: %w", err) } diff --git a/pkg/discovery/etcd3_test.go b/pkg/discovery/etcd3_test.go index 7c9c59fb..81f87c6d 100644 --- a/pkg/discovery/etcd3_test.go +++ b/pkg/discovery/etcd3_test.go @@ -54,6 +54,24 @@ func TestEtcd3RegistryService_Lookup(t *testing.T) { }, }, }, + { + name: "host is ipv6", + getResp: &clientv3.GetResponse{ + Kvs: []*mvccpb.KeyValue{ + { + Key: []byte("registry-seata-default-2000:0000:0000:0000:0001:2345:6789:abcd:8091"), + Value: []byte("2000:0000:0000:0000:0001:2345:6789:abcd:8091"), + }, + }, + }, + watchResp: nil, + want: []*ServiceInstance{ + { + Addr: "2000:0000:0000:0000:0001:2345:6789:abcd", + Port: 8091, + }, + }, + }, { name: "use watch update ServiceInstances", getResp: nil, diff --git a/pkg/discovery/file.go b/pkg/discovery/file.go index abc71a37..3be8b560 100644 --- a/pkg/discovery/file.go +++ b/pkg/discovery/file.go @@ -19,15 +19,14 @@ package discovery import ( "fmt" - "strconv" "strings" "seata.apache.org/seata-go/pkg/util/log" + "seata.apache.org/seata-go/pkg/util/net" ) const ( endPointSplitChar = ";" - ipPortSplitChar = ":" ) type FileRegistryService struct { @@ -66,17 +65,13 @@ func (s *FileRegistryService) Lookup(key string) ([]*ServiceInstance, error) { addrs := strings.Split(addrStr, endPointSplitChar) instances := make([]*ServiceInstance, 0) for _, addr := range addrs { - ipPort := strings.Split(addr, ipPortSplitChar) - if len(ipPort) != 2 { - return nil, fmt.Errorf("endpoint format should like ip:port. endpoint: %s", addr) - } - ip := ipPort[0] - port, err := strconv.Atoi(ipPort[1]) + host, port, err := net.SplitIPPortStr(addr) if err != nil { + log.Errorf("endpoint err. endpoint: %s", addr) return nil, err } instances = append(instances, &ServiceInstance{ - Addr: ip, + Addr: host, Port: port, }) } diff --git a/pkg/discovery/file_test.go b/pkg/discovery/file_test.go index 7b3bc7b5..9d653c13 100644 --- a/pkg/discovery/file_test.go +++ b/pkg/discovery/file_test.go @@ -136,7 +136,7 @@ func TestFileRegistryService_Lookup(t *testing.T) { }, want: nil, wantErr: true, - wantErrMsg: "endpoint format should like ip:port. endpoint: 127.0.0.18091", + wantErrMsg: "address 127.0.0.18091: missing port in address", }, { name: "port is not number", @@ -157,6 +157,75 @@ func TestFileRegistryService_Lookup(t *testing.T) { wantErr: true, wantErrMsg: "strconv.Atoi: parsing \"abc\": invalid syntax", }, + { + name: "endpoint is ipv6", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + Grouplist: map[string]string{ + "default": "[2000:0000:0000:0000:0001:2345:6789:abcd]:8080", + }, + }, + }, + want: []*ServiceInstance{ + { + Addr: "2000:0000:0000:0000:0001:2345:6789:abcd", + Port: 8080, + }, + }, + wantErr: false, + }, + { + name: "endpoint is ipv6", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + Grouplist: map[string]string{ + "default": "[2000:0000:0000:0000:0001:2345:6789:abcd%10]:8080", + }, + }, + }, + want: []*ServiceInstance{ + { + Addr: "2000:0000:0000:0000:0001:2345:6789:abcd", + Port: 8080, + }, + }, + wantErr: false, + }, + { + name: "endpoint is ipv6", + args: args{ + key: "default_tx_group", + }, + fields: fields{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + Grouplist: map[string]string{ + "default": "[::]:8080", + }, + }, + }, + want: []*ServiceInstance{ + { + Addr: "::", + Port: 8080, + }, + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/remoting/getty/session_manager.go b/pkg/remoting/getty/session_manager.go index 602fffed..5cab5805 100644 --- a/pkg/remoting/getty/session_manager.go +++ b/pkg/remoting/getty/session_manager.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "reflect" + "strconv" "sync" "sync/atomic" "time" @@ -76,7 +77,7 @@ func (g *SessionManager) init() { } for _, address := range addressList { gettyClient := getty.NewTCPClient( - getty.WithServerAddress(fmt.Sprintf("%s:%d", address.Addr, address.Port)), + getty.WithServerAddress(net.JoinHostPort(address.Addr, strconv.Itoa(address.Port))), // todo if read c.gettyConf.ConnectionNum, will cause the connect to fail getty.WithConnectionNumber(1), getty.WithReconnectInterval(g.gettyConf.ReconnectInterval), diff --git a/pkg/remoting/loadbalance/xid_loadbalance.go b/pkg/remoting/loadbalance/xid_loadbalance.go index 0d18fb8b..3900832f 100644 --- a/pkg/remoting/loadbalance/xid_loadbalance.go +++ b/pkg/remoting/loadbalance/xid_loadbalance.go @@ -18,34 +18,46 @@ package loadbalance import ( + "fmt" "strings" "sync" getty "github.com/apache/dubbo-getty" + + "seata.apache.org/seata-go/pkg/util/log" + "seata.apache.org/seata-go/pkg/util/net" ) func XidLoadBalance(sessions *sync.Map, xid string) getty.Session { var session getty.Session + const delimiter = ":" + + if len(xid) > 0 && strings.Contains(xid, delimiter) { + // ip:port:transactionId -> ip:port + index := strings.LastIndex(xid, delimiter) + serverAddress := xid[:index] - // ip:port:transactionId - tmpSplits := strings.Split(xid, ":") - if len(tmpSplits) == 3 { - ip := tmpSplits[0] - port := tmpSplits[1] - ipPort := ip + ":" + port - sessions.Range(func(key, value interface{}) bool { - tmpSession := key.(getty.Session) - if tmpSession.IsClosed() { - sessions.Delete(tmpSession) + // ip:port -> port + // ipv4/v6 + ip, port, err := net.SplitIPPortStr(serverAddress) + if err != nil { + log.Errorf("xid load balance err, xid:%s, %v , change use random load balance", xid, err) + } else { + sessions.Range(func(key, value interface{}) bool { + tmpSession := key.(getty.Session) + if tmpSession.IsClosed() { + sessions.Delete(tmpSession) + return true + } + ipPort := fmt.Sprintf("%s:%d", ip, port) + connectedIpPort := tmpSession.RemoteAddr() + if ipPort == connectedIpPort { + session = tmpSession + return false + } return true - } - connectedIpPort := tmpSession.RemoteAddr() - if ipPort == connectedIpPort { - session = tmpSession - return false - } - return true - }) + }) + } } if session == nil { diff --git a/pkg/remoting/loadbalance/xid_loadbalance_test.go b/pkg/remoting/loadbalance/xid_loadbalance_test.go index db9d58bc..160e12e5 100644 --- a/pkg/remoting/loadbalance/xid_loadbalance_test.go +++ b/pkg/remoting/loadbalance/xid_loadbalance_test.go @@ -84,6 +84,12 @@ func TestXidLoadBalance(t *testing.T) { xid: "127.0.0.1:9000:111", returnAddrs: []string{"127.0.0.1:8000", "127.0.0.1:8002"}, }, + { + name: "ip is ipv6", + sessions: sessions, + xid: "2000:0000:0000:0000:0001:2345:6789:abcd:8002:111", + returnAddrs: []string{"127.0.0.1:8000", "127.0.0.1:8002"}, + }, } for _, test := range testCases { session := XidLoadBalance(test.sessions, test.xid) diff --git a/pkg/remoting/loadbalance/xid_loadbalance.go b/pkg/util/net/address_validator.go similarity index 53% copy from pkg/remoting/loadbalance/xid_loadbalance.go copy to pkg/util/net/address_validator.go index 0d18fb8b..03280cbd 100644 --- a/pkg/remoting/loadbalance/xid_loadbalance.go +++ b/pkg/util/net/address_validator.go @@ -15,42 +15,45 @@ * limitations under the License. */ -package loadbalance +package net import ( + "fmt" + "regexp" + "strconv" "strings" - "sync" +) - getty "github.com/apache/dubbo-getty" +const ( + addressSplitChar = ":" ) -func XidLoadBalance(sessions *sync.Map, xid string) getty.Session { - var session getty.Session - - // ip:port:transactionId - tmpSplits := strings.Split(xid, ":") - if len(tmpSplits) == 3 { - ip := tmpSplits[0] - port := tmpSplits[1] - ipPort := ip + ":" + port - sessions.Range(func(key, value interface{}) bool { - tmpSession := key.(getty.Session) - if tmpSession.IsClosed() { - sessions.Delete(tmpSession) - return true - } - connectedIpPort := tmpSession.RemoteAddr() - if ipPort == connectedIpPort { - session = tmpSession - return false - } - return true - }) +func SplitIPPortStr(addr string) (string, int, error) { + if addr == "" { + return "", 0, fmt.Errorf("split ip err: param addr must not empty") + } + + if addr[0] == '[' { + reg := regexp.MustCompile("[\\[\\]]") + addr = reg.ReplaceAllString(addr, "") } - if session == nil { - return RandomLoadBalance(sessions, xid) + i := strings.LastIndex(addr, addressSplitChar) + if i < 0 { + return "", 0, fmt.Errorf("address %s: missing port in address", addr) } - return session + host := addr[:i] + port := addr[i+1:] + + if strings.Contains(host, "%") { + reg := regexp.MustCompile("\\%[0-9]+") + host = reg.ReplaceAllString(host, "") + } + + portInt, err := strconv.Atoi(port) + if err != nil { + return "", 0, err + } + return host, portInt, nil } diff --git a/pkg/util/net/address_validator_test.go b/pkg/util/net/address_validator_test.go new file mode 100644 index 00000000..fd3658eb --- /dev/null +++ b/pkg/util/net/address_validator_test.go @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net + +import ( + "reflect" + "testing" +) + +func TestAddressValidator(t *testing.T) { + type IPAddr struct { + host string + port int + } + tests := []struct { + name string + address string + want *IPAddr + wantErr bool + wantErrMsg string + }{ + { + name: "normal single endpoint.", + address: "127.0.0.1:8091", + want: &IPAddr{ + "127.0.0.1", 8091, + }, + wantErr: false, + }, + { + name: "addr is empty.", + address: "", + want: nil, + wantErr: true, + wantErrMsg: "split ip err: param addr must not empty", + }, + { + name: "format is not ip:port", + address: "127.0.0.18091", + want: nil, + wantErr: true, + wantErrMsg: "address 127.0.0.18091: missing port in address", + }, + { + name: "port is not number", + address: "127.0.0.1:abc", + want: nil, + wantErr: true, + wantErrMsg: "strconv.Atoi: parsing \"abc\": invalid syntax", + }, + { + name: "endpoint is ipv6", + address: "[2000:0000:0000:0000:0001:2345:6789:abcd]:8080", + want: &IPAddr{ + "2000:0000:0000:0000:0001:2345:6789:abcd", 8080, + }, + wantErr: false, + }, + { + name: "endpoint is ipv6", + address: "[2000:0000:0000:0000:0001:2345:6789:abcd%10]:8080", + want: &IPAddr{ + "2000:0000:0000:0000:0001:2345:6789:abcd", 8080, + }, + wantErr: false, + }, + { + name: "endpoint is ipv6", + address: "2000:0000:0000:0000:0001:2345:6789:abcd:8080", + want: &IPAddr{ + "2000:0000:0000:0000:0001:2345:6789:abcd", 8080, + }, + wantErr: false, + }, + { + name: "endpoint is ipv6", + address: "[::]:8080", + want: &IPAddr{ + "::", 8080, + }, + wantErr: false, + }, + { + name: "endpoint is ipv6", + address: "::FFFF:192.168.1.2:8080", + want: &IPAddr{ + "::FFFF:192.168.1.2", 8080, + }, + wantErr: false, + }, + { + name: "endpoint is ipv6", + address: "[::FFFF:192.168.1.2]:8080", + want: &IPAddr{ + "::FFFF:192.168.1.2", 8080, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + host, port, err := SplitIPPortStr(tt.address) + + if (err != nil) != tt.wantErr { + t.Errorf("SplitIPPortStr() error = %v, wantErr = %v", err, tt.wantErr) + } + if tt.wantErr && err.Error() != tt.wantErrMsg { + t.Errorf("SplitIPPortStr() errMsg = %v, wantErrMsg = %v", err.Error(), tt.wantErrMsg) + } + got := &IPAddr{host, port} + if !tt.wantErr && !reflect.DeepEqual(got, tt.want) { + t.Errorf("SplitIPPortStr() got = %v, want = %v", got, tt.want) + } + }) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org