Alanxtl commented on code in PR #674: URL: https://github.com/apache/dubbo-go-pixiu/pull/674#discussion_r2083387841
########## pkg/common/http/manager_test.go: ########## @@ -278,3 +282,221 @@ func NewTestServerWithURL(URL string, handler http.Handler) (*httptest.Server, e ts.Start() return ts, nil } + +// StreamHTTPRecorder Used to capture and test streaming HTTP responses over channels +type StreamHTTPRecorder struct { + http.ResponseWriter + receivedBuf []string + headers http.Header + status int + flushCount int +} + +func NewStreamHTTPRecorder() *StreamHTTPRecorder { + return &StreamHTTPRecorder{ + receivedBuf: make([]string, 0), + headers: make(http.Header), + flushCount: 0, + } +} + +func (r *StreamHTTPRecorder) Header() http.Header { + return r.headers +} + +func (r *StreamHTTPRecorder) WriteHeader(statusCode int) { + r.status = statusCode +} + +func (r *StreamHTTPRecorder) Write(data []byte) (int, error) { + eventCh <- string(data) + r.receivedBuf = append(r.receivedBuf, string(data)) + return len(data), nil +} + +func (r *StreamHTTPRecorder) Flush() { + r.flushCount++ +} + +// Test a variety of common streaming HTTP response types +func TestStreamableHTTPResponse(t *testing.T) { + // define the type of content you want to test + contentTypes := []string{ + constant.HeaderValueTextPlain, + constant.HeaderValueApplicationJson, + constant.HeaderValueApplicationOctetStream, + constant.HeaderValueApplicationNDJson, + } + + for _, contentType := range contentTypes { + t.Run(fmt.Sprintf("ContentType_%s", contentType), func(t *testing.T) { + testStreamableResponse(t, contentType) + }) + } +} + +func testStreamableResponse(t *testing.T, contentType string) { + hcmc := model.HttpConnectionManagerConfig{ + RouteConfig: model.RouteConfiguration{ + RouteTrie: trie.NewTrieWithDefault("GET/api/stream", model.RouteAction{ + Cluster: "mock_stream_cluster", + }), + }, + HTTPFilters: []*model.HTTPFilter{ + { + Name: commonmock.Kind, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // Clear any data that may have been left over from the previous test + for len(eventCh) > 0 { + <-eventCh + } + + // mock server + upstreamServer, _ := NewTestServerWithURL("localhost:8080", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(constant.HeaderKeyContextType, contentType) + flusher := w.(http.Flusher) + + // Generate appropriate test data based on content type + var data []byte + for i := 1; i <= 5; i++ { + select { + case <-ctx.Done(): + return + default: + time.Sleep(10 * time.Millisecond) + + switch contentType { + case constant.HeaderValueApplicationJson: + data = []byte(fmt.Sprintf(`{"id": %d, "message": "test chunk %d"}\n`, i, i)) + case constant.HeaderValueApplicationNDJson: + data = []byte(fmt.Sprintf(`{"id": %d, "message": "test chunk %d"}\n`, i, i)) + case constant.HeaderValueApplicationOctetStream: + data = []byte(fmt.Sprintf("CHUNK-%d", i)) + default: // text/plain + data = []byte(fmt.Sprintf("Chunk %d\n", i)) + } + + _, _ = w.Write(data) + flusher.Flush() + logger.Info("Upstream sent chunk ", i) + } + } + })) + defer upstreamServer.Close() + + req := httptest.NewRequest("GET", "http://localhost:8080/api/stream", nil).WithContext(ctx) + done := make(chan struct{}) + + httpCtx := &contexthttp.HttpContext{ + Request: req, + Writer: NewStreamHTTPRecorder(), + Ctx: ctx, + } + + go func() { + defer close(done) + + hcm := CreateHttpConnectionManager(&hcmc) + + if err := hcm.Handle(httpCtx); err != nil { + t.Errorf("Handle failed: %v", err) + } + + // verify that targetResp exists + if httpCtx.TargetResp == nil { + t.Error("TargetResp is nil") + return + } + }() + + // collect and validate responses + receivedChunks := 0 + for { + receivedEvents := httpCtx.Writer.(*StreamHTTPRecorder).receivedBuf + select { + case event := <-eventCh: + logger.Info("Received chunk: %s", event) + receivedChunks++ + case <-done: + assert.Equal(t, 5, len(receivedEvents), "Should receive 5 chunks") + return + case <-time.After(5 * time.Second): + t.Fatal("Test timeout") + return + } + } +} + +// TestIsStreamableResponse Test whether it is a function that can be streamed and responded +func TestIsStreamableResponse(t *testing.T) { + tests := []struct { + name string + headers map[string]string + expected bool + }{ + { + name: "sseResponse", + headers: map[string]string{ + constant.HeaderKeyContextType: constant.HeaderValueTextEventStream, + }, + expected: true, + }, + { + name: "chunkedEncodingResponses", + headers: map[string]string{ + constant.HeaderKeyContextType: constant.HeaderValueApplicationJson, + constant.HeaderKeyTransferEncoding: constant.HeaderValueChunked, + }, + expected: true, + }, + { + name: "JsonResponseWithoutContent-Length", + headers: map[string]string{ + constant.HeaderKeyContextType: constant.HeaderValueApplicationJson, + }, + expected: true, + }, + { + name: "The text response is large Content-Length", + headers: map[string]string{ + constant.HeaderKeyContextType: constant.HeaderValueTextPlain, + constant.HeaderKeyContentLength: "2097152", // 2MB + }, + expected: true, + }, + { + name: "JSON response Content-Length", + headers: map[string]string{ + constant.HeaderKeyContextType: constant.HeaderValueApplicationJson, + constant.HeaderKeyContentLength: "1024", // 1KB + }, + expected: false, + }, + { + name: "no stream Content-Type", + headers: map[string]string{ + constant.HeaderKeyContextType: "image/jpeg", Review Comment: "image/jpeg"也不要硬编码 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org