No-SilverBullet commented on code in PR #839:
URL: https://github.com/apache/dubbo-go-pixiu/pull/839#discussion_r2613183954
##########
controllers/internal/controller/gateway_controller.go:
##########
@@ -267,6 +286,72 @@ func (r *GatewayReconciler) listGatewaysForHTTPRoute(ctx
context.Context, obj cl
return recs
}
+func (r *GatewayReconciler) listGatewaysForFilterPolicy(ctx context.Context,
obj client.Object) []reconcile.Request {
Review Comment:
just suggestion, some errors in the controller are ignored and only logged.
Consider putting some critical information in the status field.
##########
controllers/internal/controller/gateway_controller.go:
##########
@@ -684,6 +860,44 @@ func (r *GatewayReconciler) updateGatewayAddresses(ctx
context.Context, gateway
return nil
}
+func hashString(input string) string {
+ sum := sha256.Sum256([]byte(input))
+ return hex.EncodeToString(sum[:])
+}
+
+func normalizeHTTPMethods(cfg *converter.PixiuConfig) {
+ for i := range cfg.StaticResources.Listeners {
+ l := cfg.StaticResources.Listeners[i]
+ for fi := range l.FilterChain.Filters {
+ f := l.FilterChain.Filters[fi]
+ if f.Name != "dgp.filter.httpconnectionmanager" {
+ continue
+ }
+ switch c := f.Config.(type) {
Review Comment:
just suggestion, a default branch can be added here to log the type of
unsupported filter config type
##########
controllers/internal/controller/gateway_controller.go:
##########
@@ -397,13 +482,91 @@ func (r *GatewayReconciler) ensureGatewayConfigMap(ctx
context.Context, gateway
pixiuConfig, err := conv.ConvertIRToPixiuConfig(xds, allClusters)
if err != nil {
- return fmt.Errorf("failed to convert IR to Pixiu config: %w",
err)
+ return "", fmt.Errorf("failed to convert IR to Pixiu config:
%w", err)
}
+ policyLoader := NewPolicyLoader(r.Client, r.Log)
+ gatewayPolicy, err := policyLoader.LoadGatewayPolicy(ctx, gateway)
+ if err != nil {
+ r.Log.Error(err, "failed to load gateway policy", "gateway",
gateway.Name)
+ } else if gatewayPolicy != nil {
+ converter.ApplyGatewayPolicy(pixiuConfig, gatewayPolicy)
+ }
+
+ clusterPolicies, err := policyLoader.LoadAllClusterPolicies(ctx,
gateway.Namespace)
+ if err != nil {
+ r.Log.Error(err, "failed to load cluster policies")
+ } else {
+ clusterConfigMap := make(map[string]*v1alpha1.ClusterConfig)
+ for i := range clusterPolicies {
+ policy := &clusterPolicies[i]
+ if policy.Spec.TargetRef.Kind != "" &&
policy.Spec.TargetRef.Kind == "Gateway" && string(policy.Spec.TargetRef.Name)
== gateway.Name {
+ for j := range policy.Spec.ClusterRef {
+ clusterConfig :=
&policy.Spec.ClusterRef[j]
+ clusterConfigMap[clusterConfig.Name] =
clusterConfig
+ }
+ }
+ }
+
+ serviceConfigMap :=
make(map[string]*v1alpha1.ServiceClusterConfig)
+ for i := range clusterPolicies {
+ policy := &clusterPolicies[i]
+ if policy.Spec.TargetRef.Kind == "" ||
policy.Spec.TargetRef.Kind != "Gateway" || string(policy.Spec.TargetRef.Name)
!= gateway.Name {
+ for j := range policy.Spec.ServiceRef {
+ serviceConfig :=
&policy.Spec.ServiceRef[j]
+ serviceConfigMap[serviceConfig.Name] =
serviceConfig
+ }
+ }
+ }
+
+ for i := range pixiuConfig.StaticResources.Clusters {
+ cluster := pixiuConfig.StaticResources.Clusters[i]
+ if clusterConfig, found :=
clusterConfigMap[cluster.Name]; found {
+ if err := r.resolveClusterEndpoints(ctx,
gateway.Namespace, cluster, clusterConfig); err != nil {
+ r.Log.Error(err, "failed to resolve
cluster endpoints", "cluster", cluster.Name)
+ }
+ converter.ApplyClusterConfig(cluster,
clusterConfig)
+ } else if serviceConfig, found :=
serviceConfigMap[cluster.Name]; found {
+ converter.ApplyClusterPolicy(cluster,
serviceConfig)
+ } else {
+ parts := splitClusterName(cluster.Name)
+ if len(parts) == 2 {
Review Comment:
i think feels clearer to put the logic for determining whether a string
split is valid in splitClusterName function.
##########
controllers/internal/controller/gateway_controller.go:
##########
@@ -735,3 +949,203 @@ func (r *GatewayReconciler) listGatewaysForService(ctx
context.Context, obj clie
return nil
}
+
+func (r *GatewayReconciler) resolveClusterEndpoints(ctx context.Context,
namespace string, cluster *converter.Cluster, clusterConfig
*v1alpha1.ClusterConfig) error {
+ if len(clusterConfig.Endpoints) == 0 {
+ return nil
+ }
+
+ resolvedEndpoints := []*converter.Endpoint{}
+ for _, epConfig := range clusterConfig.Endpoints {
+ if !isIPAddress(epConfig.Address) {
+ serviceName, serviceNamespace :=
parseServiceAddress(epConfig.Address, namespace)
+
+ clusterIP, svcPort, err :=
r.resolveServiceClusterIP(ctx, serviceNamespace, serviceName, epConfig.Port)
+ if err == nil && clusterIP != "" {
+ resolvedEndpoints = append(resolvedEndpoints,
&converter.Endpoint{
+ ID: func() int {
+ if epConfig.ID != nil {
+ return int(*epConfig.ID)
+ }
+ return len(resolvedEndpoints) +
1
+ }(),
+ SocketAddress: converter.SocketAddress{
+ Address: clusterIP,
+ Port: int(svcPort),
+ },
+ })
+ continue
+ }
+
+ resolvedEndpoints = append(resolvedEndpoints,
&converter.Endpoint{
+ ID: func() int {
+ if epConfig.ID != nil {
+ return int(*epConfig.ID)
+ }
+ return len(resolvedEndpoints) + 1
+ }(),
+ SocketAddress: converter.SocketAddress{
+ Address: epConfig.Address,
+ Port: int(epConfig.Port),
+ },
+ })
+ continue
+ }
+
+ resolvedEndpoints = append(resolvedEndpoints,
&converter.Endpoint{
+ ID: func() int {
+ if epConfig.ID != nil {
+ return int(*epConfig.ID)
+ }
+ return len(resolvedEndpoints) + 1
+ }(),
+ SocketAddress: converter.SocketAddress{
+ Address: epConfig.Address,
+ Port: int(epConfig.Port),
+ },
+ })
+ }
+
+ if len(resolvedEndpoints) > 0 {
+ cluster.Endpoints = resolvedEndpoints
+ }
+ return nil
+}
+
+func (r *GatewayReconciler) resolveServiceEndpoints(ctx context.Context,
namespace, serviceName string, port int32) ([]*ir.Endpoint, error) {
+ var service corev1.Service
+ if err := r.Client.Get(ctx, client.ObjectKey{
+ Namespace: namespace,
+ Name: serviceName,
+ }, &service); err != nil {
+ return nil, fmt.Errorf("failed to get service %s/%s: %w",
namespace, serviceName, err)
+ }
+
+ var endpointSliceList discoveryv1.EndpointSliceList
+ if err := r.Client.List(ctx, &endpointSliceList, client.MatchingLabels{
+ discoveryv1.LabelServiceName: serviceName,
+ }, client.InNamespace(namespace)); err != nil {
+ return nil, fmt.Errorf("failed to list endpoint slices: %w",
err)
+ }
+
+ endpoints := []*ir.Endpoint{}
+ for _, endpointSlice := range endpointSliceList.Items {
+ for _, endpoint := range endpointSlice.Endpoints {
+ if endpoint.Conditions.Ready != nil &&
!*endpoint.Conditions.Ready {
+ continue
+ }
+ for _, address := range endpoint.Addresses {
+ targetPort := port
+ if len(endpointSlice.Ports) > 0 {
+ for _, endpointPort := range
endpointSlice.Ports {
+ if endpointPort.Port != nil &&
endpointPort.Name != nil {
+ targetPort =
int32(*endpointPort.Port)
+ break
+ } else if endpointPort.Port !=
nil {
+ targetPort =
int32(*endpointPort.Port)
+ break
Review Comment:
Only the first port will be retrieved here.
##########
controllers/internal/controller/gateway_controller.go:
##########
@@ -397,13 +482,91 @@ func (r *GatewayReconciler) ensureGatewayConfigMap(ctx
context.Context, gateway
pixiuConfig, err := conv.ConvertIRToPixiuConfig(xds, allClusters)
if err != nil {
- return fmt.Errorf("failed to convert IR to Pixiu config: %w",
err)
+ return "", fmt.Errorf("failed to convert IR to Pixiu config:
%w", err)
}
+ policyLoader := NewPolicyLoader(r.Client, r.Log)
+ gatewayPolicy, err := policyLoader.LoadGatewayPolicy(ctx, gateway)
+ if err != nil {
+ r.Log.Error(err, "failed to load gateway policy", "gateway",
gateway.Name)
+ } else if gatewayPolicy != nil {
+ converter.ApplyGatewayPolicy(pixiuConfig, gatewayPolicy)
+ }
+
+ clusterPolicies, err := policyLoader.LoadAllClusterPolicies(ctx,
gateway.Namespace)
+ if err != nil {
+ r.Log.Error(err, "failed to load cluster policies")
+ } else {
+ clusterConfigMap := make(map[string]*v1alpha1.ClusterConfig)
+ for i := range clusterPolicies {
+ policy := &clusterPolicies[i]
+ if policy.Spec.TargetRef.Kind != "" &&
policy.Spec.TargetRef.Kind == "Gateway" && string(policy.Spec.TargetRef.Name)
== gateway.Name {
+ for j := range policy.Spec.ClusterRef {
+ clusterConfig :=
&policy.Spec.ClusterRef[j]
+ clusterConfigMap[clusterConfig.Name] =
clusterConfig
+ }
+ }
+ }
+
+ serviceConfigMap :=
make(map[string]*v1alpha1.ServiceClusterConfig)
+ for i := range clusterPolicies {
+ policy := &clusterPolicies[i]
+ if policy.Spec.TargetRef.Kind == "" ||
policy.Spec.TargetRef.Kind != "Gateway" || string(policy.Spec.TargetRef.Name)
!= gateway.Name {
+ for j := range policy.Spec.ServiceRef {
+ serviceConfig :=
&policy.Spec.ServiceRef[j]
+ serviceConfigMap[serviceConfig.Name] =
serviceConfig
+ }
+ }
+ }
Review Comment:
just suggestion, these two loops can be merged together
##########
controllers/internal/controller/gateway_controller.go:
##########
@@ -495,12 +658,16 @@ func (r *GatewayReconciler) ensureDataPlane(ctx
context.Context, gateway *gatewa
"app.kubernetes.io/name":
"pixiu-gateway",
"gateway.networking.k8s.io/gateway-name": gateway.GetName(),
},
+ Annotations: map[string]string{
+ "pixiu.apache.org/config-hash":
configHash,
+ },
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
- Name: "pixiu",
- Image:
"mfordjody/pixiugateway:debug",
+ Name:
"pixiu",
+ Image:
"mfordjody/pixiugateway:debug",
+ ImagePullPolicy:
"Always",
Review Comment:
just suggestion, hardcoding it to Always feels a bit clunky; IfNotPresent or
reading it from the configuration would be better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]