This is an automated email from the ASF dual-hosted git repository.

luky116 pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/feature/saga by this push:
     new bb1c1262 feat: HttpServiceTaskState Support (#769)
bb1c1262 is described below

commit bb1c1262d442caa54ec07e87d1f592f01540f51b
Author: FengZhang <[email protected]>
AuthorDate: Sat Mar 15 20:21:29 2025 +0800

    feat: HttpServiceTaskState Support (#769)
    
    * support http invoker for sage engine
    
    * fix comment language
---
 .../statemachine/engine/invoker/http_invoker.go    | 220 +++++++++++++++++++++
 .../engine/invoker/http_invoker_test.go            | 176 +++++++++++++++++
 2 files changed, 396 insertions(+)

diff --git a/pkg/saga/statemachine/engine/invoker/http_invoker.go 
b/pkg/saga/statemachine/engine/invoker/http_invoker.go
new file mode 100644
index 00000000..c23531ad
--- /dev/null
+++ b/pkg/saga/statemachine/engine/invoker/http_invoker.go
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package invoker
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "net/http"
+       "reflect"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       "github.com/seata/seata-go/pkg/util/log"
+)
+
+const errHttpCode = 400
+
+type HTTPInvoker struct {
+       clientsMapLock sync.Mutex
+       clients        map[string]HTTPClient
+}
+
+func NewHTTPInvoker() *HTTPInvoker {
+       return &HTTPInvoker{
+               clients: make(map[string]HTTPClient),
+       }
+}
+
+func (h *HTTPInvoker) RegisterClient(serviceName string, client HTTPClient) {
+       h.clientsMapLock.Lock()
+       defer h.clientsMapLock.Unlock()
+       h.clients[serviceName] = client
+}
+
+func (h *HTTPInvoker) GetClient(serviceName string) HTTPClient {
+       h.clientsMapLock.Lock()
+       defer h.clientsMapLock.Unlock()
+       return h.clients[serviceName]
+}
+
+func (h *HTTPInvoker) Invoke(ctx context.Context, input []any, service 
state.ServiceTaskState) (output []reflect.Value, err error) {
+       serviceTaskStateImpl := service.(*state.ServiceTaskStateImpl)
+       client := h.GetClient(serviceTaskStateImpl.ServiceName())
+       if client == nil {
+               return nil, fmt.Errorf("no http client %s for service task 
state", serviceTaskStateImpl.ServiceName())
+       }
+
+       if serviceTaskStateImpl.IsAsync() {
+               go func() {
+                       _, err := client.Call(ctx, serviceTaskStateImpl, input)
+                       if err != nil {
+                               log.Errorf("invoke Service[%s].%s failed, err 
is %s", serviceTaskStateImpl.ServiceName(),
+                                       serviceTaskStateImpl.ServiceMethod(), 
err)
+                       }
+               }()
+               return nil, nil
+       }
+
+       return client.Call(ctx, serviceTaskStateImpl, input)
+}
+
+func (h *HTTPInvoker) Close(ctx context.Context) error {
+       return nil
+}
+
+type HTTPClient interface {
+       Call(ctx context.Context, serviceTaskStateImpl 
*state.ServiceTaskStateImpl, input []any) ([]reflect.Value, error)
+}
+
+type HTTPClientImpl struct {
+       serviceName string
+       baseURL     string
+       client      *http.Client
+}
+
+func NewHTTPClient(serviceName string, baseURL string, client *http.Client) 
*HTTPClientImpl {
+       if client == nil {
+               client = &http.Client{
+                       Timeout: time.Second * 30,
+               }
+       }
+       return &HTTPClientImpl{
+               serviceName: serviceName,
+               baseURL:     baseURL,
+               client:      client,
+       }
+}
+
+func (h *HTTPClientImpl) Call(ctx context.Context, serviceTaskStateImpl 
*state.ServiceTaskStateImpl, input []any) ([]reflect.Value, error) {
+       retryCountMap := make(map[state.Retry]int)
+       for {
+               res, err, shouldRetry := func() (res []reflect.Value, resErr 
error, shouldRetry bool) {
+                       defer func() {
+                               if r := recover(); r != nil {
+                                       errStr := fmt.Sprintf("%v", r)
+                                       retry := 
h.matchRetry(serviceTaskStateImpl, errStr)
+                                       resErr = errors.New(errStr)
+                                       if retry != nil {
+                                               shouldRetry = 
h.needRetry(serviceTaskStateImpl, retryCountMap, retry, resErr)
+                                       }
+                               }
+                       }()
+
+                       reqBody, err := json.Marshal(input)
+                       if err != nil {
+                               return nil, err, false
+                       }
+
+                       req, err := http.NewRequestWithContext(ctx,
+                               serviceTaskStateImpl.ServiceMethod(),
+                               h.baseURL+serviceTaskStateImpl.Name(),
+                               bytes.NewBuffer(reqBody))
+                       if err != nil {
+                               return nil, err, false
+                       }
+
+                       req.Header.Set("Content-Type", "application/json")
+
+                       resp, err := h.client.Do(req)
+                       if err != nil {
+                               retry := h.matchRetry(serviceTaskStateImpl, 
err.Error())
+                               if retry != nil {
+                                       return nil, err, 
h.needRetry(serviceTaskStateImpl, retryCountMap, retry, err)
+                               }
+                               return nil, err, false
+                       }
+                       defer resp.Body.Close()
+
+                       body, err := io.ReadAll(resp.Body)
+                       if err != nil {
+                               return nil, err, false
+                       }
+
+                       if resp.StatusCode >= errHttpCode {
+                               errStr := fmt.Sprintf("HTTP error: %d - %s", 
resp.StatusCode, string(body))
+                               retry := h.matchRetry(serviceTaskStateImpl, 
errStr)
+                               if retry != nil {
+                                       return nil, errors.New(errStr), 
h.needRetry(serviceTaskStateImpl, retryCountMap, retry, err)
+                               }
+                               return nil, errors.New(errStr), false
+                       }
+
+                       return []reflect.Value{
+                               reflect.ValueOf(string(body)),
+                               
reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()),
+                       }, nil, false
+               }()
+
+               if !shouldRetry {
+                       if err != nil {
+                               return nil, fmt.Errorf("invoke Service[%s] 
failed, not satisfy retry config, the last err is %s",
+                                       serviceTaskStateImpl.ServiceName(), err)
+                       }
+                       return res, nil
+               }
+       }
+}
+
+func (h *HTTPClientImpl) matchRetry(impl *state.ServiceTaskStateImpl, str 
string) state.Retry {
+       if impl.Retry() != nil {
+               for _, retry := range impl.Retry() {
+                       if retry.Exceptions() != nil {
+                               for _, exception := range retry.Exceptions() {
+                                       if strings.Contains(str, exception) {
+                                               return retry
+                                       }
+                               }
+                       }
+               }
+       }
+       return nil
+}
+
+func (h *HTTPClientImpl) needRetry(impl *state.ServiceTaskStateImpl, countMap 
map[state.Retry]int, retry state.Retry, err error) bool {
+       attempt, exist := countMap[retry]
+       if !exist {
+               countMap[retry] = 0
+       }
+
+       if attempt >= retry.MaxAttempt() {
+               return false
+       }
+
+       intervalSecond := retry.IntervalSecond()
+       backoffRate := retry.BackoffRate()
+       var currentInterval int64
+       if attempt == 0 {
+               currentInterval = int64(intervalSecond * 1000)
+       } else {
+               currentInterval = int64(intervalSecond * backoffRate * 
float64(attempt) * 1000)
+       }
+
+       log.Warnf("invoke service[%s.%s] failed, will retry after %s millis, 
current retry count: %s, current err: %s",
+               impl.ServiceName(), impl.ServiceMethod(), currentInterval, 
attempt, err)
+
+       time.Sleep(time.Duration(currentInterval) * time.Millisecond)
+       countMap[retry] = attempt + 1
+       return true
+}
diff --git a/pkg/saga/statemachine/engine/invoker/http_invoker_test.go 
b/pkg/saga/statemachine/engine/invoker/http_invoker_test.go
new file mode 100644
index 00000000..062aac03
--- /dev/null
+++ b/pkg/saga/statemachine/engine/invoker/http_invoker_test.go
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package invoker
+
+import (
+       "context"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+       "testing"
+       "time"
+
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestHTTPInvokerInvokeSucceedWithOutRetry(t *testing.T) {
+       // create test server
+       server := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               var input []interface{}
+               err := json.NewDecoder(r.Body).Decode(&input)
+               if err != nil {
+                       http.Error(w, err.Error(), http.StatusBadRequest)
+                       return
+               }
+               w.WriteHeader(http.StatusOK)
+               w.Write([]byte(input[0].(string)))
+       }))
+       defer server.Close()
+
+       // create HTTP Invoker
+       invoker := NewHTTPInvoker()
+       client := NewHTTPClient("test", server.URL+"/", &http.Client{})
+       invoker.RegisterClient("test", client)
+
+       // invoke
+       ctx := context.Background()
+       values, err := invoker.Invoke(ctx, []any{"hello"}, 
newHTTPServiceTaskState())
+
+       // verify
+       assert.NoError(t, err)
+       assert.NotNil(t, values)
+       assert.Equal(t, "hello", values[0].Interface())
+}
+
+func TestHTTPInvokerInvokeWithRetry(t *testing.T) {
+       attemptCount := 0
+       // create test server
+       server := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               attemptCount++
+               if attemptCount < 2 {
+                       w.WriteHeader(http.StatusInternalServerError)
+                       w.Write([]byte("fail"))
+                       return
+               }
+               var input []interface{}
+               json.NewDecoder(r.Body).Decode(&input)
+               w.WriteHeader(http.StatusOK)
+               w.Write([]byte(input[0].(string)))
+       }))
+       defer server.Close()
+
+       // create HTTP Invoker
+       invoker := NewHTTPInvoker()
+       client := NewHTTPClient("test", server.URL+"/", &http.Client{})
+       invoker.RegisterClient("test", client)
+
+       // invoker
+       ctx := context.Background()
+       values, err := invoker.Invoke(ctx, []any{"hello"}, 
newHTTPServiceTaskStateWithRetry())
+
+       // verify
+       assert.NoError(t, err)
+       assert.NotNil(t, values)
+       assert.Equal(t, "hello", values[0].Interface())
+       assert.Equal(t, 2, attemptCount)
+}
+
+func TestHTTPInvokerInvokeFailedInRetry(t *testing.T) {
+       // create test server
+       server := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               w.WriteHeader(http.StatusInternalServerError)
+               w.Write([]byte("fail"))
+       }))
+       defer server.Close()
+
+       // create HTTP Invoker
+       invoker := NewHTTPInvoker()
+       client := NewHTTPClient("test", server.URL+"/", &http.Client{})
+       invoker.RegisterClient("test", client)
+
+       // invoker
+       ctx := context.Background()
+       _, err := invoker.Invoke(ctx, []any{"hello"}, 
newHTTPServiceTaskStateWithRetry())
+
+       // verify
+       assert.Error(t, err)
+}
+
+func TestHTTPInvokerAsyncInvoke(t *testing.T) {
+       called := false
+       // create test server
+       server := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               called = true
+               w.WriteHeader(http.StatusOK)
+               w.Write([]byte("success"))
+       }))
+       defer server.Close()
+
+       // create HTTP Invoker
+       invoker := NewHTTPInvoker()
+       client := NewHTTPClient("test", server.URL+"/", &http.Client{})
+       invoker.RegisterClient("test", client)
+
+       // async invoke
+       ctx := context.Background()
+       taskState := newHTTPServiceTaskStateWithAsync()
+       _, err := invoker.Invoke(ctx, []any{"hello"}, taskState)
+
+       // verify
+       assert.NoError(t, err)
+       time.Sleep(100 * time.Millisecond)
+       assert.True(t, called)
+}
+
+func newHTTPServiceTaskState() state.ServiceTaskState {
+       serviceTaskStateImpl := state.NewServiceTaskStateImpl()
+       serviceTaskStateImpl.SetName("test")
+       serviceTaskStateImpl.SetIsAsync(false)
+       serviceTaskStateImpl.SetServiceName("test")
+       serviceTaskStateImpl.SetServiceType("HTTP")
+       serviceTaskStateImpl.SetServiceMethod("POST")
+       return serviceTaskStateImpl
+}
+
+func newHTTPServiceTaskStateWithAsync() state.ServiceTaskState {
+       serviceTaskStateImpl := state.NewServiceTaskStateImpl()
+       serviceTaskStateImpl.SetName("test")
+       serviceTaskStateImpl.SetIsAsync(true)
+       serviceTaskStateImpl.SetServiceName("test")
+       serviceTaskStateImpl.SetServiceType("HTTP")
+       serviceTaskStateImpl.SetServiceMethod("POST")
+       return serviceTaskStateImpl
+}
+
+func newHTTPServiceTaskStateWithRetry() state.ServiceTaskState {
+       serviceTaskStateImpl := state.NewServiceTaskStateImpl()
+       serviceTaskStateImpl.SetName("test")
+       serviceTaskStateImpl.SetIsAsync(false)
+       serviceTaskStateImpl.SetServiceName("test")
+       serviceTaskStateImpl.SetServiceType("HTTP")
+       serviceTaskStateImpl.SetServiceMethod("POST")
+
+       retryImpl := &state.RetryImpl{}
+       retryImpl.SetExceptions([]string{"fail"})
+       retryImpl.SetIntervalSecond(1)
+       retryImpl.SetMaxAttempt(3)
+       retryImpl.SetBackoffRate(0.9)
+       serviceTaskStateImpl.SetRetry([]state.Retry{retryImpl})
+       return serviceTaskStateImpl
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to