This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new e34e12b chore: apisix http client (#147)
e34e12b is described below
commit e34e12b438cd8bf6af8bc31f885e23f0651b256d
Author: Alex Zhang <[email protected]>
AuthorDate: Fri Jan 8 16:34:45 2021 +0800
chore: apisix http client (#147)
---
Makefile | 2 +-
cmd/ingress/ingress_test.go | 16 +-
go.mod | 2 +
go.sum | 7 +
pkg/apisix/apisix.go | 144 ++++++++++++
pkg/apisix/cluster.go | 259 +++++++++++++++++++++
pkg/apisix/cluster_test.go | 92 ++++++++
pkg/apisix/nonexistentclient.go | 132 +++++++++++
pkg/apisix/resource.go | 203 ++++++++++++++++
.../service_test.go => apisix/resource_test.go} | 58 +++--
pkg/apisix/route.go | 136 +++++++++++
pkg/apisix/route_test.go | 235 +++++++++++++++++++
pkg/apisix/service.go | 128 ++++++++++
pkg/apisix/service_test.go | 210 +++++++++++++++++
pkg/apisix/ssl.go | 125 ++++++++++
pkg/apisix/ssl_test.go | 200 ++++++++++++++++
pkg/apisix/upstream.go | 186 +++++++++++++++
pkg/apisix/upstream_test.go | 228 ++++++++++++++++++
pkg/ingress/apisix/tls_test.go | 15 +-
pkg/ingress/controller/controller.go | 13 +-
pkg/ingress/controller/endpoint.go | 4 +-
pkg/ingress/controller/watch.go | 8 +-
pkg/seven/apisix/route.go | 202 +---------------
pkg/seven/apisix/route_test.go | 104 ---------
pkg/seven/apisix/service.go | 166 +------------
pkg/seven/apisix/ssl.go | 139 -----------
pkg/seven/apisix/ssl_test.go | 80 -------
pkg/seven/apisix/upstream.go | 225 ++----------------
pkg/seven/apisix/upstream_test.go | 84 -------
pkg/seven/conf/conf.go | 32 ++-
pkg/seven/state/builder.go | 58 +++--
pkg/seven/state/service_worker.go | 26 ++-
pkg/seven/state/solver.go | 26 ++-
test/e2e/go.mod | 2 +
test/e2e/go.sum | 9 +
test/e2e/ingress/route.go | 8 +-
test/e2e/scaffold/apisix.go | 2 +-
test/e2e/scaffold/crd.go | 19 +-
38 files changed, 2497 insertions(+), 1088 deletions(-)
diff --git a/Makefile b/Makefile
index 180f171..679cf14 100644
--- a/Makefile
+++ b/Makefile
@@ -60,7 +60,7 @@ e2e-test:
export
APISIX_UPSTREAM_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixUpstream.yaml && \
export
APISIX_SERVICE_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixService.yaml && \
export APISIX_TLS_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixTls.yaml
&& \
- cd test/e2e && ginkgo -cover -coverprofile=coverage.txt -r
--randomizeSuites --randomizeAllSpecs --trace
+ cd test/e2e && ginkgo -cover -coverprofile=coverage.txt -r
--randomizeSuites --randomizeAllSpecs --trace -p --nodes=1
### license-check: Do Apache License Header check
license-check:
diff --git a/cmd/ingress/ingress_test.go b/cmd/ingress/ingress_test.go
index 108fee1..47494c5 100644
--- a/cmd/ingress/ingress_test.go
+++ b/cmd/ingress/ingress_test.go
@@ -29,7 +29,6 @@ import (
"github.com/api7/ingress-controller/pkg/config"
"github.com/api7/ingress-controller/pkg/log"
- "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/types"
)
@@ -61,9 +60,19 @@ func (fws *fakeWriteSyncer) bytes() (p []byte) {
func TestSignalHandler(t *testing.T) {
cmd := NewIngressCommand()
+ cmd.SetArgs([]string{
+ "--log-level", "debug",
+ "--log-output", "./test.log",
+ "--http-listen", "127.0.0.1:16780",
+ "--enable-profiling",
+ "--kubeconfig", "/foo/bar/baz",
+ "--resync-interval", "24h",
+ "--apisix-base-url",
"http://apisixgw.default.cluster.local/apisix",
+ "--apisix-admin-key", "0x123",
+ })
waitCh := make(chan struct{})
go func() {
- cmd.Run(cmd, nil)
+ cmd.Execute()
close(waitCh)
}()
@@ -131,9 +140,6 @@ func TestNewIngressCommandEffectiveLog(t *testing.T) {
assert.Equal(t, cfg.Kubernetes.ResyncInterval, types.TimeDuration{24 *
time.Hour})
assert.Equal(t, cfg.APISIX.AdminKey, "0x123")
assert.Equal(t, cfg.APISIX.BaseURL,
"http://apisixgw.default.cluster.local/apisix")
-
- // Test the conf.BaseUrl is really set.
- assert.Equal(t, cfg.APISIX.BaseURL, conf.BaseUrl)
}
func parseLog(t *testing.T, r *bufio.Reader) *fields {
diff --git a/go.mod b/go.mod
index 821dab1..6cfe209 100644
--- a/go.mod
+++ b/go.mod
@@ -20,7 +20,9 @@ require (
github.com/yudai/gojsondiff v1.0.0
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
github.com/yudai/pp v2.0.1+incompatible // indirect
+ go.uber.org/multierr v1.3.0
go.uber.org/zap v1.13.0
+ golang.org/x/net v0.0.0-20201224014010-6772e930b67b
gopkg.in/resty.v1 v1.12.0
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.0.0-20190819141258-3544db3b9e44
diff --git a/go.sum b/go.sum
index 41f38a7..a4e3b84 100644
--- a/go.sum
+++ b/go.sum
@@ -319,6 +319,8 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod
h1:HSz+uSET+XFnRR8LxR
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc
h1:gkKoSkUmnU6bpS/VhkuO27bzQeSA51uaEfbOW5dNb68=
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20201224014010-6772e930b67b
h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw=
+golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -346,11 +348,16 @@ golang.org/x/sys
v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42
h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod
h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20161028155119-f51c12702a4d/go.mod
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
new file mode 100644
index 0000000..5069f52
--- /dev/null
+++ b/pkg/apisix/apisix.go
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "context"
+
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+)
+
+// APISIX is the unified client tool to communicate with APISIX.
+type APISIX interface {
+ // Cluster specifies the target cluster to talk.
+ Cluster(string) Cluster
+ // AddCluster adds a new cluster.
+ AddCluster(*ClusterOptions) error
+ // ListClusters lists all APISIX clusters.
+ ListClusters() []Cluster
+}
+
+// Cluster defines specific operations that can be applied in an APISIX
+// cluster.
+type Cluster interface {
+ // Route returns a Route interface that can operate Route resources.
+ Route() Route
+ // Upstream returns a Upstream interface that can operate Upstream
resources.
+ Upstream() Upstream
+ // Service returns a Service interface that can operate Service
resources.
+ Service() Service
+ // SSL returns a SSL interface that can operate SSL resources.
+ SSL() SSL
+}
+
+// Route is the specific client interface to take over the create, update,
+// list and delete for APISIX's Route resource.
+type Route interface {
+ List(context.Context) ([]*v1.Route, error)
+ Create(context.Context, *v1.Route) (*v1.Route, error)
+ Delete(context.Context, *v1.Route) error
+ Update(context.Context, *v1.Route) (*v1.Route, error)
+}
+
+// SSL is the specific client interface to take over the create, update,
+// list and delete for APISIX's SSL resource.
+type SSL interface {
+ List(context.Context) ([]*v1.Ssl, error)
+ Create(context.Context, *v1.Ssl) (*v1.Ssl, error)
+ Delete(context.Context, *v1.Ssl) error
+ Update(context.Context, *v1.Ssl) (*v1.Ssl, error)
+}
+
+// Upstream is the specific client interface to take over the create, update,
+// list and delete for APISIX's Upstream resource.
+type Upstream interface {
+ List(context.Context) ([]*v1.Upstream, error)
+ Create(context.Context, *v1.Upstream) (*v1.Upstream, error)
+ Delete(context.Context, *v1.Upstream) error
+ Update(context.Context, *v1.Upstream) (*v1.Upstream, error)
+}
+
+// Service is the specific client interface to take over the create, update,
+// list and delete for APISIX's Service resource.
+type Service interface {
+ List(context.Context) ([]*v1.Service, error)
+ Create(context.Context, *v1.Service) (*v1.Service, error)
+ Delete(context.Context, *v1.Service) error
+ Update(context.Context, *v1.Service) (*v1.Service, error)
+}
+
+type apisix struct {
+ defaultCluster Cluster
+ nonExistentCluster Cluster
+ defaultClusterName string
+ clusters map[string]Cluster
+}
+
+// NewForOptions creates an APISIX client to perform resources change pushing.
+// Users should carry a ClusterOptions to configure the default APISIX cluster.
+func NewForOptions(co *ClusterOptions) (APISIX, error) {
+ defaultCluster, err := newCluster(co)
+ if err != nil {
+ return nil, err
+ }
+ cli := &apisix{
+ defaultCluster: defaultCluster,
+ defaultClusterName: co.Name,
+ nonExistentCluster: newNonExistentCluster(),
+ }
+ return cli, nil
+}
+
+// Cluster implements APISIX.Cluster method.
+func (c *apisix) Cluster(name string) Cluster {
+ if name == c.defaultClusterName {
+ return c.defaultCluster
+ }
+ cluster, ok := c.clusters[name]
+ if !ok {
+ return c.nonExistentCluster
+ }
+ return cluster
+}
+
+// ListClusters implements APISIX.ListClusters method.
+func (c *apisix) ListClusters() []Cluster {
+ clusters := make([]Cluster, 0, len(c.clusters)+1)
+ clusters = append(clusters, c.defaultCluster)
+ for _, cluster := range c.clusters {
+ clusters = append(clusters, cluster)
+ }
+ return clusters
+}
+
+// AddCluster implements APISIX.AddCluster method.
+func (c *apisix) AddCluster(co *ClusterOptions) error {
+ if co.Name == c.defaultClusterName {
+ return ErrDuplicatedCluster
+ }
+ _, ok := c.clusters[co.Name]
+ if ok {
+ return ErrDuplicatedCluster
+ }
+ cluster, err := newCluster(co)
+ if err != nil {
+ return err
+ }
+ if c.clusters == nil {
+ c.clusters = make(map[string]Cluster)
+ }
+ c.clusters[co.Name] = cluster
+ return nil
+}
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
new file mode 100644
index 0000000..0a9cab7
--- /dev/null
+++ b/pkg/apisix/cluster.go
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+
+ "go.uber.org/multierr"
+ "go.uber.org/zap"
+
+ "github.com/api7/ingress-controller/pkg/log"
+)
+
+const (
+ _defaultTimeout = 5 * time.Second
+)
+
+var (
+ // ErrClusterNotExist means a cluster doesn't exist.
+ ErrClusterNotExist = errors.New("client not exist")
+ // ErrDuplicatedCluster means the cluster adding request was
+ // rejected since the cluster was already created.
+ ErrDuplicatedCluster = errors.New("duplicated cluster")
+
+ _errReadOnClosedResBody = errors.New("http: read on closed response
body")
+)
+
+// Options contains parameters to customize APISIX client.
+type ClusterOptions struct {
+ Name string
+ AdminKey string
+ BaseURL string
+ Timeout time.Duration
+}
+
+type cluster struct {
+ name string
+ baseURL string
+ adminKey string
+ cli *http.Client
+
+ route Route
+ upstream Upstream
+ service Service
+ ssl SSL
+}
+
+func newCluster(o *ClusterOptions) (Cluster, error) {
+ if o.BaseURL == "" {
+ return nil, errors.New("empty base url")
+ }
+ if o.Timeout == time.Duration(0) {
+ o.Timeout = _defaultTimeout
+ }
+ o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")
+
+ c := &cluster{
+ name: o.Name,
+ baseURL: o.BaseURL,
+ adminKey: o.AdminKey,
+ cli: &http.Client{
+ Timeout: o.Timeout,
+ Transport: &http.Transport{
+ ResponseHeaderTimeout: o.Timeout,
+ ExpectContinueTimeout: o.Timeout,
+ },
+ },
+ }
+ c.route = newRouteClient(c)
+ c.upstream = newUpstreamClient(c)
+ c.service = newServiceClient(c)
+ c.ssl = newSSLClient(c)
+
+ return c, nil
+}
+
+// String exposes the client information in human readable format.
+func (c *cluster) String() string {
+ return fmt.Sprintf("name=%s; base_url=%s", c.name, c.baseURL)
+}
+
+// Route implements Cluster.Route method.
+func (c *cluster) Route() Route {
+ return c.route
+}
+
+// Upstream implements Cluster.Upstream method.
+func (c *cluster) Upstream() Upstream {
+ return c.upstream
+}
+
+// Service implements Cluster.Service method.
+func (c *cluster) Service() Service {
+ return c.service
+}
+
+// SSL implements Cluster.SSL method.
+func (c *cluster) SSL() SSL {
+ return c.ssl
+}
+
+func (s *cluster) applyAuth(req *http.Request) {
+ if s.adminKey != "" {
+ req.Header.Set("X-API-Key", s.adminKey)
+ }
+}
+
+func (s *cluster) do(req *http.Request) (*http.Response, error) {
+ s.applyAuth(req)
+ return s.cli.Do(req)
+}
+
+func (s *cluster) listResource(ctx context.Context, url string)
(*listResponse, error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := s.do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer drainBody(resp.Body, url)
+ if resp.StatusCode != http.StatusOK {
+ err = multierr.Append(err, fmt.Errorf("unexpected status code
%d", resp.StatusCode))
+ err = multierr.Append(err, fmt.Errorf("error message: %s",
readBody(resp.Body, url)))
+ return nil, err
+ }
+
+ var list listResponse
+
+ dec := json.NewDecoder(resp.Body)
+ if err := dec.Decode(&list); err != nil {
+ return nil, err
+ }
+ return &list, nil
+}
+
+func (s *cluster) createResource(ctx context.Context, url string, body
io.Reader) (*createResponse, error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := s.do(req)
+ if err != nil {
+ return nil, err
+ }
+
+ defer drainBody(resp.Body, url)
+
+ if resp.StatusCode != http.StatusCreated && resp.StatusCode !=
http.StatusOK {
+ err = multierr.Append(err, fmt.Errorf("unexpected status code
%d", resp.StatusCode))
+ err = multierr.Append(err, fmt.Errorf("error message: %s",
readBody(resp.Body, url)))
+ return nil, err
+ }
+
+ var cr createResponse
+ dec := json.NewDecoder(resp.Body)
+ if err := dec.Decode(&cr); err != nil {
+ return nil, err
+ }
+ return &cr, nil
+}
+
+func (s *cluster) updateResource(ctx context.Context, url string, body
io.Reader) (*updateResponse, error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url, body)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := s.do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer drainBody(resp.Body, url)
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode !=
http.StatusCreated {
+ err = multierr.Append(err, fmt.Errorf("unexpected status code
%d", resp.StatusCode))
+ err = multierr.Append(err, fmt.Errorf("error message: %s",
readBody(resp.Body, url)))
+ return nil, err
+ }
+ var ur updateResponse
+ dec := json.NewDecoder(resp.Body)
+ if err := dec.Decode(&ur); err != nil {
+ return nil, err
+ }
+ return &ur, nil
+}
+
+func (s *cluster) deleteResource(ctx context.Context, url string) error {
+ req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
+ if err != nil {
+ return err
+ }
+ resp, err := s.do(req)
+ if err != nil {
+ return err
+ }
+ defer drainBody(resp.Body, url)
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode !=
http.StatusNoContent && resp.StatusCode != http.StatusNotFound {
+ err = multierr.Append(err, fmt.Errorf("unexpected status code
%d", resp.StatusCode))
+ err = multierr.Append(err, fmt.Errorf("error message: %s",
readBody(resp.Body, url)))
+ return err
+ }
+ return nil
+}
+
+// drainBody reads whole data until EOF from r, then close it.
+func drainBody(r io.ReadCloser, url string) {
+ _, err := io.Copy(ioutil.Discard, r)
+ if err != nil {
+ if err.Error() != _errReadOnClosedResBody.Error() {
+ log.Warnw("failed to drain body (read)",
+ zap.String("url", url),
+ zap.Error(err),
+ )
+ }
+ }
+
+ if err := r.Close(); err != nil {
+ log.Warnw("failed to drain body (close)",
+ zap.String("url", url),
+ zap.Error(err),
+ )
+ }
+}
+
+func readBody(r io.ReadCloser, url string) string {
+ defer func() {
+ if err := r.Close(); err != nil {
+ log.Warnw("failed to close body", zap.String("url",
url), zap.Error(err))
+ }
+ }()
+ data, err := ioutil.ReadAll(r)
+ if err != nil {
+ log.Warnw("failed to read body", zap.String("url", url),
zap.Error(err))
+ return ""
+ }
+ return string(data)
+}
diff --git a/pkg/apisix/cluster_test.go b/pkg/apisix/cluster_test.go
new file mode 100644
index 0000000..a803e31
--- /dev/null
+++ b/pkg/apisix/cluster_test.go
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apisix
+
+import (
+ "context"
+ "testing"
+
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestAddCluster(t *testing.T) {
+ apisix, err := NewForOptions(&ClusterOptions{
+ BaseURL: "http://service1:9080/apisix/admin",
+ })
+ assert.Nil(t, err)
+
+ clusters := apisix.ListClusters()
+ assert.Len(t, clusters, 1)
+
+ err = apisix.AddCluster(&ClusterOptions{
+ Name: "service2",
+ BaseURL: "http://service2:9080/apisix/admin",
+ })
+ assert.Nil(t, err)
+
+ err = apisix.AddCluster(&ClusterOptions{
+ Name: "service2",
+ AdminKey: "http://service3:9080/apisix/admin",
+ })
+ assert.Equal(t, ErrDuplicatedCluster, err)
+
+ clusters = apisix.ListClusters()
+ assert.Len(t, clusters, 2)
+}
+
+func TestNonExistentCluster(t *testing.T) {
+ apisix, err := NewForOptions(&ClusterOptions{
+ BaseURL: "http://service1:9080/apisix/admin",
+ })
+ assert.Nil(t, err)
+
+ _, err =
apisix.Cluster("non-existent-cluster").Route().List(context.Background())
+ assert.Equal(t, ErrClusterNotExist, err)
+ _, err =
apisix.Cluster("non-existent-cluster").Route().Create(context.Background(),
&v1.Route{})
+ assert.Equal(t, ErrClusterNotExist, err)
+ _, err =
apisix.Cluster("non-existent-cluster").Route().Update(context.Background(),
&v1.Route{})
+ assert.Equal(t, ErrClusterNotExist, err)
+ err =
apisix.Cluster("non-existent-cluster").Route().Delete(context.Background(),
&v1.Route{})
+ assert.Equal(t, ErrClusterNotExist, err)
+
+ _, err =
apisix.Cluster("non-existent-cluster").Upstream().List(context.Background())
+ assert.Equal(t, ErrClusterNotExist, err)
+ _, err =
apisix.Cluster("non-existent-cluster").Upstream().Create(context.Background(),
&v1.Upstream{})
+ assert.Equal(t, ErrClusterNotExist, err)
+ _, err =
apisix.Cluster("non-existent-cluster").Upstream().Update(context.Background(),
&v1.Upstream{})
+ assert.Equal(t, ErrClusterNotExist, err)
+ err =
apisix.Cluster("non-existent-cluster").Upstream().Delete(context.Background(),
&v1.Upstream{})
+ assert.Equal(t, ErrClusterNotExist, err)
+
+ _, err =
apisix.Cluster("non-existent-cluster").Service().List(context.Background())
+ assert.Equal(t, ErrClusterNotExist, err)
+ _, err =
apisix.Cluster("non-existent-cluster").Service().Create(context.Background(),
&v1.Service{})
+ assert.Equal(t, ErrClusterNotExist, err)
+ _, err =
apisix.Cluster("non-existent-cluster").Service().Update(context.Background(),
&v1.Service{})
+ assert.Equal(t, ErrClusterNotExist, err)
+ err =
apisix.Cluster("non-existent-cluster").Service().Delete(context.Background(),
&v1.Service{})
+ assert.Equal(t, ErrClusterNotExist, err)
+
+ _, err =
apisix.Cluster("non-existent-cluster").SSL().List(context.Background())
+ assert.Equal(t, ErrClusterNotExist, err)
+ _, err =
apisix.Cluster("non-existent-cluster").SSL().Create(context.Background(),
&v1.Ssl{})
+ assert.Equal(t, ErrClusterNotExist, err)
+ _, err =
apisix.Cluster("non-existent-cluster").SSL().Update(context.Background(),
&v1.Ssl{})
+ assert.Equal(t, ErrClusterNotExist, err)
+ err =
apisix.Cluster("non-existent-cluster").SSL().Delete(context.Background(),
&v1.Ssl{})
+ assert.Equal(t, ErrClusterNotExist, err)
+}
diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go
new file mode 100644
index 0000000..934eac4
--- /dev/null
+++ b/pkg/apisix/nonexistentclient.go
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apisix
+
+import (
+ "context"
+
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+)
+
+type nonExistentCluster struct {
+ embedDummyResourceImplementer
+}
+
+func newNonExistentCluster() *nonExistentCluster {
+ return &nonExistentCluster{
+ embedDummyResourceImplementer{
+ route: &dummyRoute{},
+ ssl: &dummySSL{},
+ service: &dummyService{},
+ upstream: &dummyUpstream{},
+ },
+ }
+}
+
+type embedDummyResourceImplementer struct {
+ route Route
+ ssl SSL
+ upstream Upstream
+ service Service
+}
+
+type dummyRoute struct{}
+
+func (f *dummyRoute) List(_ context.Context) ([]*v1.Route, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyRoute) Create(_ context.Context, _ *v1.Route) (*v1.Route, error)
{
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyRoute) Delete(_ context.Context, _ *v1.Route) error {
+ return ErrClusterNotExist
+}
+
+func (f *dummyRoute) Update(_ context.Context, _ *v1.Route) (*v1.Route, error)
{
+ return nil, ErrClusterNotExist
+}
+
+type dummySSL struct{}
+
+func (f *dummySSL) List(_ context.Context) ([]*v1.Ssl, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummySSL) Create(_ context.Context, _ *v1.Ssl) (*v1.Ssl, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummySSL) Delete(_ context.Context, _ *v1.Ssl) error {
+ return ErrClusterNotExist
+}
+
+func (f *dummySSL) Update(_ context.Context, _ *v1.Ssl) (*v1.Ssl, error) {
+ return nil, ErrClusterNotExist
+}
+
+type dummyUpstream struct{}
+
+func (f *dummyUpstream) List(_ context.Context) ([]*v1.Upstream, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyUpstream) Create(_ context.Context, _ *v1.Upstream)
(*v1.Upstream, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyUpstream) Delete(_ context.Context, _ *v1.Upstream) error {
+ return ErrClusterNotExist
+}
+
+func (f *dummyUpstream) Update(_ context.Context, _ *v1.Upstream)
(*v1.Upstream, error) {
+ return nil, ErrClusterNotExist
+}
+
+type dummyService struct{}
+
+func (f *dummyService) List(_ context.Context) ([]*v1.Service, error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyService) Create(_ context.Context, _ *v1.Service) (*v1.Service,
error) {
+ return nil, ErrClusterNotExist
+}
+
+func (f *dummyService) Delete(_ context.Context, _ *v1.Service) error {
+ return ErrClusterNotExist
+}
+
+func (f *dummyService) Update(_ context.Context, _ *v1.Service) (*v1.Service,
error) {
+ return nil, ErrClusterNotExist
+}
+
+func (nc *nonExistentCluster) Route() Route {
+ return nc.route
+}
+
+func (nc *nonExistentCluster) SSL() SSL {
+ return nc.ssl
+}
+
+func (nc *nonExistentCluster) Service() Service {
+ return nc.service
+}
+
+func (nc *nonExistentCluster) Upstream() Upstream {
+ return nc.upstream
+}
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
new file mode 100644
index 0000000..1c7ecc9
--- /dev/null
+++ b/pkg/apisix/resource.go
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/api7/ingress-controller/pkg/log"
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+)
+
+// listResponse is the unified LIST response mapping of APISIX.
+type listResponse struct {
+ Count string `json:"count"`
+ Node node `json:"node"`
+}
+
+type createResponse struct {
+ Action string `json:"action"`
+ Item item `json:"node"`
+}
+
+type updateResponse = createResponse
+
+type node struct {
+ Key string `json:"key"`
+ Items items `json:"nodes"`
+}
+
+type items []item
+
+// items implements json.Unmarshaler interface.
+// lua-cjson doesn't distinguish empty array and table,
+// and by default empty array will be encoded as '{}'.
+// We have to maintain the compatibility.
+func (items *items) UnmarshalJSON(p []byte) error {
+ if p[0] == '{' {
+ if len(p) != 2 {
+ return errors.New("unexpected non-empty object")
+ }
+ return nil
+ }
+ var data []item
+ if err := json.Unmarshal(p, &data); err != nil {
+ return err
+ }
+ *items = data
+ return nil
+}
+
+type item struct {
+ Key string `json:"key"`
+ Value json.RawMessage `json:"value"`
+}
+
+type routeItem struct {
+ UpstreamId *string `json:"upstream_id"`
+ ServiceId *string `json:"service_id"`
+ Host *string `json:"host"`
+ URI *string `json:"uri"`
+ Desc *string `json:"desc"`
+ Methods []*string `json:"methods"`
+ Plugins map[string]interface{} `json:"plugins"`
+}
+
+// route decodes item.Value and converts it to v1.Route.
+func (i *item) route(clusterName string) (*v1.Route, error) {
+ log.Infof("got route: %s", string(i.Value))
+ list := strings.Split(i.Key, "/")
+ if len(list) < 1 {
+ return nil, fmt.Errorf("bad route config key: %s", i.Key)
+ }
+
+ var route routeItem
+ if err := json.Unmarshal(i.Value, &route); err != nil {
+ return nil, err
+ }
+
+ fullName := genFullName(route.Desc, clusterName)
+
+ return &v1.Route{
+ ID: &list[len(list)-1],
+ FullName: &fullName,
+ Group: &clusterName,
+ Name: route.Desc,
+ Host: route.Host,
+ Path: route.URI,
+ Methods: route.Methods,
+ UpstreamId: route.UpstreamId,
+ ServiceId: route.ServiceId,
+ Plugins: (*v1.Plugins)(&route.Plugins),
+ }, nil
+}
+
+// upstream decodes item.Value and converts it to v1.Upstream.
+func (i *item) upstream(clusterName string) (*v1.Upstream, error) {
+ log.Infof("got upstream: %s", string(i.Value))
+ list := strings.Split(i.Key, "/")
+ if len(list) < 1 {
+ return nil, fmt.Errorf("bad upstream config key: %s", i.Key)
+ }
+
+ var ups upstreamItem
+ if err := json.Unmarshal(i.Value, &ups); err != nil {
+ return nil, err
+ }
+
+ id := list[len(list)-1]
+ name := ups.Desc
+ LBType := ups.LBType
+ key := i.Key
+
+ var nodes []*v1.Node
+ for _, node := range ups.Nodes {
+ nodes = append(nodes, &v1.Node{
+ IP: &node.Host,
+ Port: &node.Port,
+ Weight: &node.Weight,
+ })
+ }
+
+ fullName := genFullName(ups.Desc, clusterName)
+
+ return &v1.Upstream{
+ ID: &id,
+ FullName: &fullName,
+ Group: &clusterName,
+ Name: name,
+ Type: LBType,
+ Key: &key,
+ Nodes: nodes,
+ }, nil
+}
+
+// service decodes item.Value and converts it to v1.Service.
+func (i *item) service(clusterName string) (*v1.Service, error) {
+ log.Infof("got service: %s", string(i.Value))
+ var svc serviceItem
+ if err := json.Unmarshal(i.Value, &svc); err != nil {
+ return nil, err
+ }
+
+ list := strings.Split(i.Key, "/")
+ id := list[len(list)-1]
+ var plugins v1.Plugins
+ if svc.Plugins != nil {
+ plugins := make(v1.Plugins, len(*svc.Plugins))
+ for k, v := range *svc.Plugins {
+ plugins[k] = v
+ }
+ }
+ fullName := genFullName(svc.Desc, clusterName)
+
+ return &v1.Service{
+ ID: &id,
+ FullName: &fullName,
+ Group: &clusterName,
+ Name: svc.Desc,
+ UpstreamId: svc.UpstreamId,
+ Plugins: &plugins,
+ }, nil
+}
+
+// ssl decodes item.Value and converts it to v1.Ssl.
+func (i *item) ssl(clusterName string) (*v1.Ssl, error) {
+ log.Infof("got ssl: %s", string(i.Value))
+ var ssl v1.Ssl
+ if err := json.Unmarshal(i.Value, &ssl); err != nil {
+ return nil, err
+ }
+
+ list := strings.Split(i.Key, "/")
+ id := list[len(list)-1]
+ ssl.ID = &id
+ ssl.Group = &clusterName
+ return &ssl, nil
+}
+
+func genFullName(name *string, clusterName string) string {
+ fullName := "unknown"
+ if name != nil {
+ fullName = *name
+ }
+ if clusterName != "" {
+ fullName = clusterName + "_" + fullName
+ }
+ return fullName
+}
diff --git a/pkg/seven/apisix/service_test.go b/pkg/apisix/resource_test.go
similarity index 59%
rename from pkg/seven/apisix/service_test.go
rename to pkg/apisix/resource_test.go
index 4309e4d..7a625db 100644
--- a/pkg/seven/apisix/service_test.go
+++ b/pkg/apisix/resource_test.go
@@ -21,15 +21,15 @@ import (
"github.com/stretchr/testify/assert"
)
-func TestServiceUnmarshalJSON(t *testing.T) {
- var svc Services
+func TestItemUnmarshalJSON(t *testing.T) {
+ var items node
emptyData := `
{
"key": "test",
"nodes": {}
}
`
- err := json.Unmarshal([]byte(emptyData), &svc)
+ err := json.Unmarshal([]byte(emptyData), &items)
assert.Nil(t, err)
emptyData = `
@@ -38,7 +38,7 @@ func TestServiceUnmarshalJSON(t *testing.T) {
"nodes": {"a": "b", "c": "d"}
}
`
- err = json.Unmarshal([]byte(emptyData), &svc)
+ err = json.Unmarshal([]byte(emptyData), &items)
assert.Equal(t, err.Error(), "unexpected non-empty object")
emptyArray := `
@@ -47,34 +47,32 @@ func TestServiceUnmarshalJSON(t *testing.T) {
"nodes": []
}
`
- err = json.Unmarshal([]byte(emptyArray), &svc)
+ err = json.Unmarshal([]byte(emptyArray), &items)
assert.Nil(t, err)
-
- normalData := `
-{
- "key": "test",
- "nodes": [
- {
- "key": "svc1",
- "value": {
- "desc": "test service 1",
- "upstream_id": "123",
- "plugins": {}
- }
- }
- ]
}
-`
- err = json.Unmarshal([]byte(normalData), &svc)
- assert.Nil(t, err)
- assert.Equal(t, svc.Key, "test")
- assert.Equal(t, len(svc.Services), 1)
- key := *svc.Services[0].Key
- assert.Equal(t, key, "svc1")
- desc := *svc.Services[0].ServiceValue.Desc
- assert.Equal(t, desc, "test service 1")
+func TestItemConvertRoute(t *testing.T) {
+ item := &item{
+ Key: "/apisix/routes/001",
+ Value: json.RawMessage(`
+ {
+ "upstream_id": "13",
+ "service_id": "14",
+ "host": "foo.com",
+ "uri": "/shop/133/details",
+ "methods": ["GET", "POST"]
+ }
+ `),
+ }
- upstreamId := *svc.Services[0].ServiceValue.UpstreamId
- assert.Equal(t, upstreamId, "123")
+ r, err := item.route("qa")
+ assert.Nil(t, err)
+ assert.Equal(t, *r.UpstreamId, "13")
+ assert.Equal(t, *r.ServiceId, "14")
+ assert.Equal(t, *r.Host, "foo.com")
+ assert.Equal(t, *r.Path, "/shop/133/details")
+ assert.Equal(t, *r.Methods[0], "GET")
+ assert.Equal(t, *r.Methods[1], "POST")
+ assert.Nil(t, r.Name)
+ assert.Equal(t, *r.FullName, "qa_unknown")
}
diff --git a/pkg/apisix/route.go b/pkg/apisix/route.go
new file mode 100644
index 0000000..a505d28
--- /dev/null
+++ b/pkg/apisix/route.go
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apisix
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+
+ "go.uber.org/zap"
+
+ "github.com/api7/ingress-controller/pkg/log"
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+)
+
+type routeReqBody struct {
+ Desc *string `json:"desc,omitempty"`
+ URI *string `json:"uri,omitempty"`
+ Host *string `json:"host,omitempty"`
+ ServiceId *string `json:"service_id,omitempty"`
+ Plugins *v1.Plugins `json:"plugins,omitempty"`
+}
+
+type routeClient struct {
+ clusterName string
+ url string
+ cluster *cluster
+}
+
+func newRouteClient(c *cluster) Route {
+ return &routeClient{
+ clusterName: c.name,
+ url: c.baseURL + "/routes",
+ cluster: c,
+ }
+}
+
+func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
+ log.Infow("try to list routes in APISIX", zap.String("url", r.url))
+
+ routeItems, err := r.cluster.listResource(ctx, r.url)
+ if err != nil {
+ log.Errorf("failed to list routes: %s", err)
+ return nil, err
+ }
+
+ var items []*v1.Route
+ for i, item := range routeItems.Node.Items {
+ route, err := item.route(r.clusterName)
+ if err != nil {
+ log.Errorw("failed to convert route item",
+ zap.String("url", r.url),
+ zap.String("route_key", item.Key),
+ zap.Error(err),
+ )
+ return nil, err
+ }
+
+ items = append(items, route)
+ log.Infof("list route #%d, body: %s", i, string(item.Value))
+ }
+
+ return items, nil
+}
+
+func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route,
error) {
+ log.Infow("try to create route", zap.String("host", *obj.Host))
+ data, err := json.Marshal(routeReqBody{
+ Desc: obj.Name,
+ URI: obj.Path,
+ Host: obj.Host,
+ ServiceId: obj.ServiceId,
+
+ Plugins: obj.Plugins,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ log.Infow("creating route", zap.ByteString("body", data),
zap.String("url", r.url))
+ resp, err := r.cluster.createResource(ctx, r.url, bytes.NewReader(data))
+ if err != nil {
+ log.Errorf("failed to create route: %s", err)
+ return nil, err
+ }
+
+ var clusterName string
+ if obj.Group != nil {
+ clusterName = *obj.Group
+ }
+ return resp.Item.route(clusterName)
+}
+
+func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
+ log.Infof("delete route, id:%s", *obj.ID)
+ url := r.url + "/" + *obj.ID
+ return r.cluster.deleteResource(ctx, url)
+}
+
+func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route,
error) {
+ log.Infof("update route, id:%s", *obj.ID)
+ body, err := json.Marshal(routeReqBody{
+ Desc: obj.Name,
+ Host: obj.Host,
+ URI: obj.Path,
+ ServiceId: obj.ServiceId,
+ Plugins: obj.Plugins,
+ })
+ if err != nil {
+ return nil, err
+ }
+ url := r.url + "/" + *obj.ID
+ log.Infow("updating route", zap.ByteString("body", body),
zap.String("url", r.url))
+ resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
+ if err != nil {
+ return nil, err
+ }
+ var clusterName string
+ if obj.Group != nil {
+ clusterName = *obj.Group
+ }
+ return resp.Item.route(clusterName)
+}
diff --git a/pkg/apisix/route_test.go b/pkg/apisix/route_test.go
new file mode 100644
index 0000000..7f1d6c1
--- /dev/null
+++ b/pkg/apisix/route_test.go
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "sort"
+ "strconv"
+ "strings"
+ "testing"
+
+ "golang.org/x/net/nettest"
+
+ "github.com/stretchr/testify/assert"
+
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+)
+
+type fakeAPISIXRouteSrv struct {
+ id int
+ route map[string]json.RawMessage
+}
+
+type fakeListResp struct {
+ Count string `json:"count"`
+ Node fakeNode `json:"node"`
+}
+
+type fakeCreateResp struct {
+ Action string `json:"action"`
+ Node fakeItem `json:"node"`
+}
+
+type fakeNode struct {
+ Key string `json:"key"`
+ Items []fakeItem `json:"nodes"`
+}
+
+type fakeItem struct {
+ Key string `json:"key"`
+ Value json.RawMessage `json:"value"`
+}
+
+func (srv *fakeAPISIXRouteSrv) ServeHTTP(w http.ResponseWriter, r
*http.Request) {
+ defer r.Body.Close()
+
+ if !strings.HasPrefix(r.URL.Path, "/apisix/admin/routes") {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ if r.Method == http.MethodGet {
+ resp := fakeListResp{
+ Count: strconv.Itoa(len(srv.route)),
+ Node: fakeNode{
+ Key: "/apisix/routes",
+ },
+ }
+ var keys []string
+ for key := range srv.route {
+ keys = append(keys, key)
+ }
+ sort.Strings(keys)
+ for _, key := range keys {
+ resp.Node.Items = append(resp.Node.Items, fakeItem{
+ Key: key,
+ Value: srv.route[key],
+ })
+ }
+ w.WriteHeader(http.StatusOK)
+ data, _ := json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodDelete {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/routes/")
+ id = "/apisix/routes/" + id
+ code := http.StatusNotFound
+ if _, ok := srv.route[id]; ok {
+ delete(srv.route, id)
+ code = http.StatusOK
+ }
+ w.WriteHeader(code)
+ }
+
+ if r.Method == http.MethodPost {
+ srv.id++
+ key := fmt.Sprintf("/apisix/routes/%d", srv.id)
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.route[key] = data
+ w.WriteHeader(http.StatusCreated)
+ resp := fakeCreateResp{
+ Action: "create",
+ Node: fakeItem{
+ Key: key,
+ Value: json.RawMessage(data),
+ },
+ }
+ data, _ = json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodPatch {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/routes/")
+ id = "/apisix/routes/" + id
+ if _, ok := srv.route[id]; !ok {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.route[id] = data
+
+ w.WriteHeader(http.StatusOK)
+ output := fmt.Sprintf(`{"action": "compareAndSwap", "node":
{"key": "%s", "value": %s}}`, id, string(data))
+ _, _ = w.Write([]byte(output))
+ return
+ }
+}
+
+func runFakeRouteSrv(t *testing.T) *http.Server {
+ srv := &fakeAPISIXRouteSrv{
+ id: 0,
+ route: make(map[string]json.RawMessage),
+ }
+
+ ln, _ := nettest.NewLocalListener("tcp")
+
+ httpSrv := &http.Server{
+ Addr: ln.Addr().String(),
+ Handler: srv,
+ }
+
+ go func() {
+ if err := httpSrv.Serve(ln); err != nil && err !=
http.ErrServerClosed {
+ t.Errorf("failed to run http server: %s", err)
+ }
+ }()
+
+ return httpSrv
+}
+
+func TestRouteClient(t *testing.T) {
+ srv := runFakeRouteSrv(t)
+ defer func() {
+ assert.Nil(t, srv.Shutdown(context.Background()))
+ }()
+
+ u := url.URL{
+ Scheme: "http",
+ Host: srv.Addr,
+ Path: "/apisix/admin",
+ }
+
+ cli := newRouteClient(&cluster{
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ })
+
+ // Create
+ id := "111"
+ host := "www.foo.com"
+ uri := "/bar"
+ name := "test"
+ obj, err := cli.Create(context.Background(), &v1.Route{
+ Host: &host,
+ Path: &uri,
+ Name: &name,
+ ServiceId: &id,
+ UpstreamId: &id,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, *obj.ID, "1")
+
+ obj, err = cli.Create(context.Background(), &v1.Route{
+ Host: &host,
+ Path: &uri,
+ Name: &name,
+ ServiceId: &id,
+ UpstreamId: &id,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, *obj.ID, "2")
+
+ // List
+ objs, err := cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 2)
+ assert.Equal(t, *objs[0].ID, "1")
+ assert.Equal(t, *objs[1].ID, "2")
+
+ // Delete then List
+ assert.Nil(t, cli.Delete(context.Background(), objs[0]))
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", *objs[0].ID)
+
+ // Patch then List
+ id = "112"
+ objId := "2"
+ _, err = cli.Update(context.Background(), &v1.Route{
+ ID: &objId,
+ Host: &host,
+ Path: &uri,
+ Name: &name,
+ ServiceId: &id,
+ UpstreamId: &id,
+ })
+ assert.Nil(t, err)
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", *objs[0].ID)
+ assert.Equal(t, "112", *objs[0].ServiceId)
+}
diff --git a/pkg/apisix/service.go b/pkg/apisix/service.go
new file mode 100644
index 0000000..85c477f
--- /dev/null
+++ b/pkg/apisix/service.go
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apisix
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+
+ "github.com/api7/ingress-controller/pkg/log"
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+ "go.uber.org/zap"
+)
+
+type serviceClient struct {
+ url string
+ clusterName string
+ cluster *cluster
+}
+
+type serviceItem struct {
+ UpstreamId *string `json:"upstream_id,omitempty"`
+ Plugins *map[string]interface{} `json:"plugins,omitempty"`
+ Desc *string `json:"desc,omitempty"`
+}
+
+func newServiceClient(c *cluster) Service {
+ return &serviceClient{
+ url: c.baseURL + "/services",
+ clusterName: c.name,
+ cluster: c,
+ }
+}
+
+func (s *serviceClient) List(ctx context.Context) ([]*v1.Service, error) {
+ log.Infow("try to list services in APISIX", zap.String("url", s.url))
+
+ upsItems, err := s.cluster.listResource(ctx, s.url)
+ if err != nil {
+ log.Errorf("failed to list upstreams: %s", err)
+ return nil, err
+ }
+
+ var items []*v1.Service
+ for i, item := range upsItems.Node.Items {
+ svc, err := item.service(s.clusterName)
+ if err != nil {
+ log.Errorw("failed to convert service item",
+ zap.String("url", s.url),
+ zap.String("service_key", item.Key),
+ zap.Error(err),
+ )
+ return nil, err
+ }
+ items = append(items, svc)
+ log.Infof("list service #%d, body: %s", i, string(item.Value))
+ }
+ return items, nil
+}
+
+func (s *serviceClient) Create(ctx context.Context, obj *v1.Service)
(*v1.Service, error) {
+ log.Infow("try to create service", zap.String("full_name",
*obj.FullName))
+
+ body, err := json.Marshal(serviceItem{
+ UpstreamId: obj.UpstreamId,
+ Plugins: (*map[string]interface{})(obj.Plugins),
+ Desc: obj.Name,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ log.Infow("creating service", zap.ByteString("body", body),
zap.String("url", s.url))
+ resp, err := s.cluster.createResource(ctx, s.url, bytes.NewReader(body))
+ if err != nil {
+ log.Errorf("failed to create service: %s", err)
+ return nil, err
+ }
+ var clusterName string
+ if obj.Group != nil {
+ clusterName = *obj.Group
+ }
+ return resp.Item.service(clusterName)
+}
+
+func (s *serviceClient) Delete(ctx context.Context, obj *v1.Service) error {
+ log.Infof("delete service, id:%s", *obj.ID)
+ url := s.url + "/" + *obj.ID
+ return s.cluster.deleteResource(ctx, url)
+}
+
+func (s *serviceClient) Update(ctx context.Context, obj *v1.Service)
(*v1.Service, error) {
+ log.Infof("update service, id:%s", *obj.ID)
+
+ body, err := json.Marshal(serviceItem{
+ UpstreamId: obj.UpstreamId,
+ Plugins: (*map[string]interface{})(obj.Plugins),
+ Desc: obj.Name,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ url := s.url + "/" + *obj.ID
+ log.Infow("creating service", zap.ByteString("body", body),
zap.String("url", url))
+ resp, err := s.cluster.updateResource(ctx, url, bytes.NewReader(body))
+ if err != nil {
+ return nil, err
+ }
+ var clusterName string
+ if obj.Group != nil {
+ clusterName = *obj.Group
+ }
+ return resp.Item.service(clusterName)
+}
diff --git a/pkg/apisix/service_test.go b/pkg/apisix/service_test.go
new file mode 100644
index 0000000..a2ed175
--- /dev/null
+++ b/pkg/apisix/service_test.go
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "sort"
+ "strconv"
+ "strings"
+ "testing"
+
+ "golang.org/x/net/nettest"
+
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+ "github.com/stretchr/testify/assert"
+)
+
+type fakeAPISIXServiceSrv struct {
+ id int
+ service map[string]json.RawMessage
+}
+
+func (srv *fakeAPISIXServiceSrv) ServeHTTP(w http.ResponseWriter, r
*http.Request) {
+ defer r.Body.Close()
+
+ if !strings.HasPrefix(r.URL.Path, "/apisix/admin/services") {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ if r.Method == http.MethodGet {
+ resp := fakeListResp{
+ Count: strconv.Itoa(len(srv.service)),
+ Node: fakeNode{
+ Key: "/apisix/services",
+ },
+ }
+ var keys []string
+ for key := range srv.service {
+ keys = append(keys, key)
+ }
+ sort.Strings(keys)
+ for _, key := range keys {
+ resp.Node.Items = append(resp.Node.Items, fakeItem{
+ Key: key,
+ Value: srv.service[key],
+ })
+ }
+ w.WriteHeader(http.StatusOK)
+ data, _ := json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodDelete {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/services/")
+ id = "/apisix/services/" + id
+ code := http.StatusNotFound
+ if _, ok := srv.service[id]; ok {
+ delete(srv.service, id)
+ code = http.StatusOK
+ }
+ w.WriteHeader(code)
+ }
+
+ if r.Method == http.MethodPost {
+ srv.id++
+ key := fmt.Sprintf("/apisix/services/%d", srv.id)
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.service[key] = data
+ w.WriteHeader(http.StatusCreated)
+ resp := fakeCreateResp{
+ Action: "create",
+ Node: fakeItem{
+ Key: key,
+ Value: json.RawMessage(data),
+ },
+ }
+ data, _ = json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodPatch {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/services/")
+ id = "/apisix/services/" + id
+ if _, ok := srv.service[id]; !ok {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.service[id] = data
+
+ w.WriteHeader(http.StatusOK)
+ output := fmt.Sprintf(`{"action": "compareAndSwap", "node":
{"key": "%s", "value": %s}}`, id, string(data))
+ _, _ = w.Write([]byte(output))
+ return
+ }
+}
+
+func runFakeServiceSrv(t *testing.T) *http.Server {
+ srv := &fakeAPISIXServiceSrv{
+ id: 0,
+ service: make(map[string]json.RawMessage),
+ }
+
+ ln, _ := nettest.NewLocalListener("tcp")
+ httpSrv := &http.Server{
+ Addr: ln.Addr().String(),
+ Handler: srv,
+ }
+
+ go func() {
+ if err := httpSrv.Serve(ln); err != nil && err !=
http.ErrServerClosed {
+ t.Errorf("failed to run http server: %s", err)
+ }
+ }()
+
+ return httpSrv
+}
+
+func TestServiceClient(t *testing.T) {
+ srv := runFakeServiceSrv(t)
+ defer func() {
+ assert.Nil(t, srv.Shutdown(context.Background()))
+ }()
+
+ u := url.URL{
+ Scheme: "http",
+ Host: srv.Addr,
+ Path: "/apisix/admin",
+ }
+ cli := newServiceClient(&cluster{
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ })
+
+ // Create
+ group := "default"
+ fullName := "default_test"
+ name := "test"
+ upsId := "13"
+
+ obj, err := cli.Create(context.TODO(), &v1.Service{
+ FullName: &fullName,
+ Group: &group,
+ Name: &name,
+ UpstreamId: &upsId,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, *obj.ID, "1")
+
+ obj, err = cli.Create(context.TODO(), &v1.Service{
+ FullName: &fullName,
+ Group: &group,
+ Name: &name,
+ UpstreamId: &upsId,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, *obj.ID, "2")
+
+ // List
+ objs, err := cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 2)
+ assert.Equal(t, *objs[0].ID, "1")
+ assert.Equal(t, *objs[1].ID, "2")
+
+ // Delete then List
+ assert.Nil(t, cli.Delete(context.Background(), objs[0]))
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", *objs[0].ID)
+
+ // Patch then List
+ upsId = "14"
+ objId := "2"
+ _, err = cli.Update(context.Background(), &v1.Service{
+ ID: &objId,
+ FullName: &fullName,
+ Group: &group,
+ Name: &name,
+ UpstreamId: &upsId,
+ })
+ assert.Nil(t, err)
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", *objs[0].ID)
+ assert.Equal(t, upsId, *objs[0].UpstreamId)
+}
diff --git a/pkg/apisix/ssl.go b/pkg/apisix/ssl.go
new file mode 100644
index 0000000..d904e6d
--- /dev/null
+++ b/pkg/apisix/ssl.go
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apisix
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+
+ "github.com/api7/ingress-controller/pkg/log"
+ "go.uber.org/zap"
+
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+)
+
+type sslClient struct {
+ url string
+ clusterName string
+ cluster *cluster
+}
+
+func newSSLClient(c *cluster) SSL {
+ return &sslClient{
+ url: c.baseURL + "/ssl",
+ cluster: c,
+ clusterName: c.name,
+ }
+}
+
+func (s *sslClient) List(ctx context.Context) ([]*v1.Ssl, error) {
+ log.Infow("try to list ssl in APISIX", zap.String("url", s.url))
+
+ sslItems, err := s.cluster.listResource(ctx, s.url)
+ if err != nil {
+ log.Errorf("failed to list ssl: %s", err)
+ return nil, err
+ }
+
+ var items []*v1.Ssl
+ for i, item := range sslItems.Node.Items {
+ ssl, err := item.ssl(s.clusterName)
+ if err != nil {
+ log.Errorw("failed to convert ssl item",
+ zap.String("url", s.url),
+ zap.String("ssl_key", item.Key),
+ zap.Error(err),
+ )
+ return nil, err
+ }
+ items = append(items, ssl)
+ log.Infof("list ssl #%d, body: %s", i, string(item.Value))
+ }
+
+ return items, nil
+}
+
+func (s *sslClient) Create(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
+ log.Info("try to create ssl")
+ data, err := json.Marshal(v1.Ssl{
+ Snis: obj.Snis,
+ Cert: obj.Cert,
+ Key: obj.Key,
+ Status: obj.Status,
+ })
+ if err != nil {
+ return nil, err
+ }
+ log.Infow("creating ssl", zap.ByteString("body", data),
zap.String("url", s.url))
+ resp, err := s.cluster.createResource(ctx, s.url, bytes.NewReader(data))
+ if err != nil {
+ log.Errorf("failed to create ssl: %s", err)
+ return nil, err
+ }
+
+ var clusterName string
+ if obj.Group != nil {
+ clusterName = *obj.Group
+ }
+
+ return resp.Item.ssl(clusterName)
+}
+
+func (s *sslClient) Delete(ctx context.Context, obj *v1.Ssl) error {
+ log.Infof("delete ssl, id:%s", *obj.ID)
+ url := s.url + "/" + *obj.ID
+ return s.cluster.deleteResource(ctx, url)
+}
+
+func (s *sslClient) Update(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
+ log.Infof("update ssl, id:%s", *obj.ID)
+ url := s.url + "/" + *obj.ID
+ data, err := json.Marshal(v1.Ssl{
+ ID: obj.ID,
+ Snis: obj.Snis,
+ Cert: obj.Cert,
+ Key: obj.Key,
+ Status: obj.Status,
+ })
+ if err != nil {
+ return nil, err
+ }
+ log.Infow("updating ssl", zap.ByteString("body", data),
zap.String("url", url))
+ resp, err := s.cluster.updateResource(ctx, url, bytes.NewReader(data))
+ if err != nil {
+ return nil, err
+ }
+ var clusterName string
+ if obj.Group != nil {
+ clusterName = *obj.Group
+ }
+ return resp.Item.ssl(clusterName)
+}
diff --git a/pkg/apisix/ssl_test.go b/pkg/apisix/ssl_test.go
new file mode 100644
index 0000000..e176b1e
--- /dev/null
+++ b/pkg/apisix/ssl_test.go
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apisix
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "sort"
+ "strconv"
+ "strings"
+ "testing"
+
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+ "github.com/stretchr/testify/assert"
+ "golang.org/x/net/nettest"
+)
+
+type fakeAPISIXSSLSrv struct {
+ id int
+ ssl map[string]json.RawMessage
+}
+
+func (srv *fakeAPISIXSSLSrv) ServeHTTP(w http.ResponseWriter, r *http.Request)
{
+ defer r.Body.Close()
+
+ if !strings.HasPrefix(r.URL.Path, "/apisix/admin/ssl") {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ if r.Method == http.MethodGet {
+ resp := fakeListResp{
+ Count: strconv.Itoa(len(srv.ssl)),
+ Node: fakeNode{
+ Key: "/apisix/ssl",
+ },
+ }
+ var keys []string
+ for key := range srv.ssl {
+ keys = append(keys, key)
+ }
+ sort.Strings(keys)
+ for _, key := range keys {
+ resp.Node.Items = append(resp.Node.Items, fakeItem{
+ Key: key,
+ Value: srv.ssl[key],
+ })
+ }
+ w.WriteHeader(http.StatusOK)
+ data, _ := json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodDelete {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/ssl/")
+ id = "/apisix/ssl/" + id
+ code := http.StatusNotFound
+ if _, ok := srv.ssl[id]; ok {
+ delete(srv.ssl, id)
+ code = http.StatusOK
+ }
+ w.WriteHeader(code)
+ }
+
+ if r.Method == http.MethodPost {
+ srv.id++
+ key := fmt.Sprintf("/apisix/ssl/%d", srv.id)
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.ssl[key] = data
+ w.WriteHeader(http.StatusCreated)
+ resp := fakeCreateResp{
+ Action: "create",
+ Node: fakeItem{
+ Key: key,
+ Value: json.RawMessage(data),
+ },
+ }
+ data, _ = json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodPatch {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/ssl/")
+ id = "/apisix/ssl/" + id
+ if _, ok := srv.ssl[id]; !ok {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.ssl[id] = data
+
+ w.WriteHeader(http.StatusOK)
+ output := fmt.Sprintf(`{"action": "compareAndSwap", "node":
{"key": "%s", "value": %s}}`, id, string(data))
+ _, _ = w.Write([]byte(output))
+ return
+ }
+}
+
+func runFakeSSLSrv(t *testing.T) *http.Server {
+ srv := &fakeAPISIXSSLSrv{
+ id: 0,
+ ssl: make(map[string]json.RawMessage),
+ }
+
+ ln, _ := nettest.NewLocalListener("tcp")
+ httpSrv := &http.Server{
+ Addr: ln.Addr().String(),
+ Handler: srv,
+ }
+
+ go func() {
+ if err := httpSrv.Serve(ln); err != nil && err !=
http.ErrServerClosed {
+ t.Errorf("failed to run http server: %s", err)
+ }
+ }()
+
+ return httpSrv
+}
+
+func TestSSLClient(t *testing.T) {
+ srv := runFakeSSLSrv(t)
+ defer func() {
+ assert.Nil(t, srv.Shutdown(context.Background()))
+ }()
+
+ u := url.URL{
+ Scheme: "http",
+ Host: srv.Addr,
+ Path: "/apisix/admin",
+ }
+ cli := newSSLClient(&cluster{
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ })
+
+ // Create
+ group := "default"
+ sni := "bar.com"
+ obj, err := cli.Create(context.TODO(), &v1.Ssl{
+ Group: &group,
+ Snis: []*string{&sni},
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, *obj.ID, "1")
+
+ obj, err = cli.Create(context.TODO(), &v1.Ssl{
+ Group: &group,
+ Snis: []*string{&sni},
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, *obj.ID, "2")
+
+ // List
+ objs, err := cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 2)
+ assert.Equal(t, *objs[0].ID, "1")
+ assert.Equal(t, *objs[1].ID, "2")
+
+ // Delete then List
+ assert.Nil(t, cli.Delete(context.Background(), objs[0]))
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", *objs[0].ID)
+
+ // Patch then List
+ objId := "2"
+ sni = "foo.com"
+ _, err = cli.Update(context.Background(), &v1.Ssl{
+ ID: &objId,
+ Snis: []*string{&sni},
+ })
+ assert.Nil(t, err)
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", *objs[0].ID)
+ assert.Equal(t, sni, *objs[0].Snis[0])
+}
diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go
new file mode 100644
index 0000000..0fbccf3
--- /dev/null
+++ b/pkg/apisix/upstream.go
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+
+ "go.uber.org/zap"
+
+ "github.com/api7/ingress-controller/pkg/log"
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+)
+
+type upstreamClient struct {
+ clusterName string
+ url string
+ cluster *cluster
+}
+
+type upstreamNode struct {
+ Host string `json:"host,omitempty" yaml:"ip,omitempty"`
+ Port int `json:"port,omitempty" yaml:"port,omitempty"`
+ Weight int `json:"weight,omitempty" yaml:"weight,omitempty"`
+}
+
+type upstreamNodes []upstreamNode
+
+// items implements json.Unmarshaler interface.
+// lua-cjson doesn't distinguish empty array and table,
+// and by default empty array will be encoded as '{}'.
+// We have to maintain the compatibility.
+func (n *upstreamNodes) UnmarshalJSON(p []byte) error {
+ if p[0] == '{' {
+ if len(p) != 2 {
+ return errors.New("unexpected non-empty object")
+ }
+ return nil
+ }
+ var data []upstreamNode
+ if err := json.Unmarshal(p, &data); err != nil {
+ return err
+ }
+ *n = data
+ return nil
+}
+
+type upstreamReqBody struct {
+ LBType *string `json:"type"`
+ HashOn *string `json:"hash_on,omitempty"`
+ Key *string `json:"key,omitempty"`
+ Nodes upstreamNodes `json:"nodes"`
+ Desc *string `json:"desc"`
+}
+
+type upstreamItem struct {
+ Nodes upstreamNodes `json:"nodes"`
+ Desc *string `json:"desc"`
+ LBType *string `json:"type"`
+}
+
+func newUpstreamClient(c *cluster) Upstream {
+ return &upstreamClient{
+ url: c.baseURL + "/upstreams",
+ cluster: c,
+ clusterName: c.name,
+ }
+}
+
+func (u *upstreamClient) List(ctx context.Context) ([]*v1.Upstream, error) {
+ log.Infow("try to list upstreams in APISIX", zap.String("url", u.url))
+
+ upsItems, err := u.cluster.listResource(ctx, u.url)
+ if err != nil {
+ log.Errorf("failed to list upstreams: %s", err)
+ return nil, err
+ }
+
+ var items []*v1.Upstream
+ for i, item := range upsItems.Node.Items {
+ ups, err := item.upstream(u.clusterName)
+ if err != nil {
+ log.Errorw("failed to convert upstream item",
+ zap.String("url", u.url),
+ zap.String("upstream_key", item.Key),
+ zap.Error(err),
+ )
+ return nil, err
+ }
+ items = append(items, ups)
+ log.Infof("list upstream #%d, body: %s", i, string(item.Value))
+ }
+ return items, nil
+}
+
+func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream)
(*v1.Upstream, error) {
+ log.Infow("try to create upstream",
+ zap.String("full_name", *obj.FullName),
+ )
+
+ nodes := make(upstreamNodes, 0, len(obj.Nodes))
+ for _, node := range obj.Nodes {
+ nodes = append(nodes, upstreamNode{
+ Host: *node.IP,
+ Port: *node.Port,
+ Weight: *node.Weight,
+ })
+ }
+ body, err := json.Marshal(upstreamReqBody{
+ LBType: obj.Type,
+ HashOn: obj.HashOn,
+ Key: obj.Key,
+ Nodes: nodes,
+ Desc: obj.Name,
+ })
+ if err != nil {
+ return nil, err
+ }
+ log.Infow("creating upstream", zap.ByteString("body", body),
zap.String("url", u.url))
+
+ resp, err := u.cluster.createResource(ctx, u.url, bytes.NewReader(body))
+ if err != nil {
+ log.Errorf("failed to create upstream: %s", err)
+ return nil, err
+ }
+ var clusterName string
+ if obj.Group != nil {
+ clusterName = *obj.Group
+ }
+ return resp.Item.upstream(clusterName)
+}
+
+func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
+ log.Infof("delete upstream, id:%s", *obj.ID)
+ url := u.url + "/" + *obj.ID
+ return u.cluster.deleteResource(ctx, url)
+}
+
+func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream)
(*v1.Upstream, error) {
+ log.Infof("update upstream, id:%s", *obj.ID)
+
+ nodes := make(upstreamNodes, 0, len(obj.Nodes))
+ for _, node := range obj.Nodes {
+ nodes = append(nodes, upstreamNode{
+ Host: *node.IP,
+ Port: *node.Port,
+ Weight: *node.Weight,
+ })
+ }
+ body, err := json.Marshal(upstreamReqBody{
+ LBType: obj.Type,
+ HashOn: obj.HashOn,
+ Key: obj.Key,
+ Nodes: nodes,
+ Desc: obj.Name,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ url := u.url + "/" + *obj.ID
+ log.Infow("upating upstream", zap.ByteString("body", body),
zap.String("url", url))
+ resp, err := u.cluster.updateResource(ctx, url, bytes.NewReader(body))
+ if err != nil {
+ return nil, err
+ }
+ var clusterName string
+ if obj.Group != nil {
+ clusterName = *obj.Group
+ }
+ return resp.Item.upstream(clusterName)
+}
diff --git a/pkg/apisix/upstream_test.go b/pkg/apisix/upstream_test.go
new file mode 100644
index 0000000..63bed5c
--- /dev/null
+++ b/pkg/apisix/upstream_test.go
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "sort"
+ "strconv"
+ "strings"
+ "testing"
+
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+
+ "github.com/stretchr/testify/assert"
+
+ "golang.org/x/net/nettest"
+)
+
+type fakeAPISIXUpstreamSrv struct {
+ id int
+ upstream map[string]json.RawMessage
+}
+
+func (srv *fakeAPISIXUpstreamSrv) ServeHTTP(w http.ResponseWriter, r
*http.Request) {
+ defer r.Body.Close()
+
+ if !strings.HasPrefix(r.URL.Path, "/apisix/admin/upstreams") {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ if r.Method == http.MethodGet {
+ resp := fakeListResp{
+ Count: strconv.Itoa(len(srv.upstream)),
+ Node: fakeNode{
+ Key: "/apisix/upstreams",
+ },
+ }
+ var keys []string
+ for key := range srv.upstream {
+ keys = append(keys, key)
+ }
+ sort.Strings(keys)
+ for _, key := range keys {
+ resp.Node.Items = append(resp.Node.Items, fakeItem{
+ Key: key,
+ Value: srv.upstream[key],
+ })
+ }
+ w.WriteHeader(http.StatusOK)
+ data, _ := json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodDelete {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/upstreams/")
+ id = "/apisix/upstreams/" + id
+ code := http.StatusNotFound
+ if _, ok := srv.upstream[id]; ok {
+ delete(srv.upstream, id)
+ code = http.StatusOK
+ }
+ w.WriteHeader(code)
+ }
+
+ if r.Method == http.MethodPost {
+ srv.id++
+ key := fmt.Sprintf("/apisix/upstreams/%d", srv.id)
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.upstream[key] = data
+ w.WriteHeader(http.StatusCreated)
+ resp := fakeCreateResp{
+ Action: "create",
+ Node: fakeItem{
+ Key: key,
+ Value: json.RawMessage(data),
+ },
+ }
+ data, _ = json.Marshal(resp)
+ _, _ = w.Write(data)
+ return
+ }
+
+ if r.Method == http.MethodPatch {
+ id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/upstreams/")
+ id = "/apisix/upstreams/" + id
+ if _, ok := srv.upstream[id]; !ok {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ data, _ := ioutil.ReadAll(r.Body)
+ srv.upstream[id] = data
+
+ w.WriteHeader(http.StatusOK)
+ output := fmt.Sprintf(`{"action": "compareAndSwap", "node":
{"key": "%s", "value": %s}}`, id, string(data))
+ _, _ = w.Write([]byte(output))
+ return
+ }
+}
+
+func runFakeUpstreamSrv(t *testing.T) *http.Server {
+ srv := &fakeAPISIXUpstreamSrv{
+ id: 0,
+ upstream: make(map[string]json.RawMessage),
+ }
+
+ ln, _ := nettest.NewLocalListener("tcp")
+ httpSrv := &http.Server{
+ Addr: ln.Addr().String(),
+ Handler: srv,
+ }
+
+ go func() {
+ if err := httpSrv.Serve(ln); err != nil && err !=
http.ErrServerClosed {
+ t.Errorf("failed to run http server: %s", err)
+ }
+ }()
+
+ return httpSrv
+}
+
+func TestUpstreamClient(t *testing.T) {
+ srv := runFakeUpstreamSrv(t)
+ defer func() {
+ assert.Nil(t, srv.Shutdown(context.Background()))
+ }()
+
+ u := url.URL{
+ Scheme: "http",
+ Host: srv.Addr,
+ Path: "/apisix/admin",
+ }
+ cli := newUpstreamClient(&cluster{
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ })
+
+ // Create
+ key := "upstream/abc"
+ lbType := "roundrobin"
+ fullName := "default_test"
+ group := "default"
+ name := "test"
+ ip := "10.0.11.153"
+ port := 15006
+ weight := 100
+ nodes := []*v1.Node{
+ {
+ IP: &ip,
+ Port: &port,
+ Weight: &weight,
+ },
+ }
+
+ obj, err := cli.Create(context.TODO(), &v1.Upstream{
+ FullName: &fullName,
+ Group: &group,
+ Name: &name,
+ Type: &lbType,
+ Key: &key,
+ Nodes: nodes,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, *obj.ID, "1")
+
+ obj, err = cli.Create(context.TODO(), &v1.Upstream{
+ FullName: &fullName,
+ Group: &group,
+ Name: &name,
+ Type: &lbType,
+ Key: &key,
+ Nodes: nodes,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, *obj.ID, "2")
+
+ // List
+ objs, err := cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 2)
+ assert.Equal(t, *objs[0].ID, "1")
+ assert.Equal(t, *objs[1].ID, "2")
+
+ // Delete then List
+ assert.Nil(t, cli.Delete(context.Background(), objs[0]))
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", *objs[0].ID)
+
+ // Patch then List
+ lbType = "chash"
+ objId := "2"
+ _, err = cli.Update(context.Background(), &v1.Upstream{
+ ID: &objId,
+ FullName: &fullName,
+ Group: &group,
+ Name: &name,
+ Type: &lbType,
+ Key: &key,
+ Nodes: nodes,
+ })
+ assert.Nil(t, err)
+ objs, err = cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, 1)
+ assert.Equal(t, "2", *objs[0].ID)
+ assert.Equal(t, lbType, *objs[0].Type)
+}
diff --git a/pkg/ingress/apisix/tls_test.go b/pkg/ingress/apisix/tls_test.go
index 1bdbc20..b61223a 100644
--- a/pkg/ingress/apisix/tls_test.go
+++ b/pkg/ingress/apisix/tls_test.go
@@ -21,8 +21,10 @@ import (
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
- "k8s.io/api/core/v1"
+ v1 "k8s.io/api/core/v1"
+ apisixhttp "github.com/api7/ingress-controller/pkg/apisix"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/utils"
apisix "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
@@ -99,6 +101,7 @@ spec:
Status: &status,
Group: &group,
}
+ setDummyApisixClient(t)
atlsCRD := &ApisixTlsCRD{}
err := yaml.Unmarshal([]byte(atlsStr), atlsCRD)
assert.Nil(t, err, "yaml decode failed")
@@ -119,6 +122,7 @@ spec:
name: test-atls
namespace: helm
`
+ setDummyApisixClient(t)
atlsCRD := &ApisixTlsCRD{}
err := yaml.Unmarshal([]byte(atlsStr), atlsCRD)
assert.Nil(t, err, "yaml decode failed")
@@ -157,3 +161,12 @@ type SecretClientErrorMock struct{}
func (sc *SecretClientErrorMock) FindByName(namespace, name string)
(*v1.Secret, error) {
return nil, utils.ErrNotFound
}
+
+func setDummyApisixClient(t *testing.T) {
+ cli, err := apisixhttp.NewForOptions(&apisixhttp.ClusterOptions{
+ Name: "",
+ BaseURL: "http://127.0.0.2:9080/apisix/admin",
+ })
+ assert.Nil(t, err)
+ conf.SetAPISIXClient(cli)
+}
diff --git a/pkg/ingress/controller/controller.go
b/pkg/ingress/controller/controller.go
index 429d133..7cf38ed 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -18,6 +18,8 @@ import (
"os"
"sync"
+ "github.com/api7/ingress-controller/pkg/apisix"
+
clientSet
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
crdclientset
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions"
@@ -58,7 +60,16 @@ func NewController(cfg *config.Config) (*Controller, error) {
podNamespace = "default"
}
- conf.SetBaseUrl(cfg.APISIX.BaseURL)
+ client, err := apisix.NewForOptions(&apisix.ClusterOptions{
+ Name: "",
+ AdminKey: cfg.APISIX.AdminKey,
+ BaseURL: cfg.APISIX.BaseURL,
+ })
+ if err != nil {
+ return nil, err
+ }
+ conf.SetAPISIXClient(client)
+
if err := kube.InitInformer(cfg); err != nil {
return nil, err
}
diff --git a/pkg/ingress/controller/endpoint.go
b/pkg/ingress/controller/endpoint.go
index 03504ca..396e2a4 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -15,6 +15,7 @@
package controller
import (
+ "context"
"fmt"
"strconv"
"time"
@@ -31,7 +32,6 @@ import (
"github.com/api7/ingress-controller/pkg/kube"
"github.com/api7/ingress-controller/pkg/log"
- "github.com/api7/ingress-controller/pkg/seven/apisix"
sevenConf "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/state"
apisixv1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
@@ -180,7 +180,7 @@ func (c *EndpointController) process(ep *CoreV1.Endpoints) {
}
func syncWithGroup(group, upstreamName string, ips []string, port
CoreV1.EndpointPort) {
- upstreams, err := apisix.ListUpstream(group)
+ upstreams, err :=
sevenConf.Client.Cluster(group).Upstream().List(context.TODO())
if err == nil {
for _, upstream := range upstreams {
if *(upstream.Name) == upstreamName {
diff --git a/pkg/ingress/controller/watch.go b/pkg/ingress/controller/watch.go
index dfcd06d..4661d94 100644
--- a/pkg/ingress/controller/watch.go
+++ b/pkg/ingress/controller/watch.go
@@ -15,13 +15,13 @@
package controller
import (
+ "context"
"strconv"
- "github.com/api7/ingress-controller/pkg/kube"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
- "github.com/api7/ingress-controller/pkg/seven/apisix"
+ "github.com/api7/ingress-controller/pkg/kube"
sevenConf "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/state"
apisixv1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
@@ -70,8 +70,8 @@ func (c *controller) process(obj interface{}) {
upstreamName := ep.Namespace + "_" + ep.Name +
"_" + strconv.Itoa(int(port.Port))
// find upstreamName is in apisix
// sync with all apisix group
- for k := range sevenConf.UrlGroup {
- upstreams, err := apisix.ListUpstream(k)
+ for _, cluster := range
sevenConf.Client.ListClusters() {
+ upstreams, err :=
cluster.Upstream().List(context.TODO())
if err == nil {
for _, upstream := range
upstreams {
if *(upstream.Name) ==
upstreamName {
diff --git a/pkg/seven/apisix/route.go b/pkg/seven/apisix/route.go
index 2da4d40..0e0712c 100644
--- a/pkg/seven/apisix/route.go
+++ b/pkg/seven/apisix/route.go
@@ -15,10 +15,8 @@
package apisix
import (
- "encoding/json"
- "errors"
+ "context"
"fmt"
- "strings"
"github.com/api7/ingress-controller/pkg/seven/conf"
sevendb "github.com/api7/ingress-controller/pkg/seven/db"
@@ -28,13 +26,17 @@ import (
// FindCurrentRoute find current route in memDB
func FindCurrentRoute(route *v1.Route) (*v1.Route, error) {
+ var cluster string
+ if route.Group != nil {
+ cluster = *route.Group
+ }
db := &sevendb.RouteRequest{Group: *route.Group, Name: *route.Name,
FullName: *route.FullName}
currentRoute, _ := db.FindByName()
if currentRoute != nil {
return currentRoute, nil
} else {
// find from apisix
- if routes, err := ListRoute(*route.Group); err != nil {
+ if routes, err :=
conf.Client.Cluster(cluster).Route().List(context.TODO()); err != nil {
return nil, fmt.Errorf("list routes from etcd failed,
err: %+v", err)
} else {
for _, r := range routes {
@@ -51,195 +53,3 @@ func FindCurrentRoute(route *v1.Route) (*v1.Route, error) {
}
return nil, utils.ErrNotFound
}
-
-// ListRoute list route from etcd , convert to v1.Route
-func ListRoute(group string) ([]*v1.Route, error) {
- baseUrl := conf.FindUrl(group)
- url := baseUrl + "/routes"
- ret, err := Get(url)
- if err != nil {
- return nil, fmt.Errorf("http get failed, url: %s, err: %+v",
url, err)
- }
- var routesResponse RoutesResponse
- if err := json.Unmarshal(ret, &routesResponse); err != nil {
- return nil, fmt.Errorf("json unmarshal failed, err: %+v", err)
- } else {
- routes := make([]*v1.Route, 0)
- for _, u := range routesResponse.Routes.Routes {
- if n, err := u.convert(group); err == nil {
- routes = append(routes, n)
- } else {
- return nil, fmt.Errorf("upstream: %s 转换失败, %s",
*u.Value.Desc, err.Error())
- }
- }
- return routes, nil
- }
-}
-
-func AddRoute(route *v1.Route) (*RouteResponse, error) {
- baseUrl := conf.FindUrl(*route.Group)
- url := fmt.Sprintf("%s/routes", baseUrl)
- rr := convert2RouteRequest(route)
- if b, err := json.Marshal(rr); err != nil {
- return nil, err
- } else {
- if res, err := utils.Post(url, b); err != nil {
- return nil, err
- } else {
- var routeResp RouteResponse
- if err = json.Unmarshal(res, &routeResp); err != nil {
- return nil, err
- } else {
- if routeResp.Route.Key != nil {
- return &routeResp, nil
- } else {
- return nil, fmt.Errorf("apisix route
not expected response")
- }
-
- }
- }
- }
-}
-
-func UpdateRoute(route *v1.Route) error {
- baseUrl := conf.FindUrl(*route.Group)
- url := fmt.Sprintf("%s/routes/%s", baseUrl, *route.ID)
- rr := convert2RouteRequest(route)
- if b, err := json.Marshal(rr); err != nil {
- return err
- } else {
- if _, err := utils.Patch(url, b); err != nil {
- return err
- } else {
- return nil
- }
- }
-}
-
-func DeleteRoute(route *v1.Route) error {
- baseUrl := conf.FindUrl(*route.Group)
- url := fmt.Sprintf("%s/routes/%s", baseUrl, *route.ID)
- if _, err := utils.Delete(url); err != nil {
- return err
- } else {
- return nil
- }
-}
-
-type Redirect struct {
- RetCode int64 `json:"ret_code"`
- Uri string `json:"uri"`
-}
-
-func convert2RouteRequest(route *v1.Route) *RouteRequest {
- return &RouteRequest{
- Desc: *route.Name,
- Host: *route.Host,
- Uri: *route.Path,
- ServiceId: *route.ServiceId,
- Plugins: route.Plugins,
- }
-}
-
-// convert apisix RouteResponse -> apisix-types v1.Route
-func (r *Route) convert(group string) (*v1.Route, error) {
- // id
- key := r.Key
- ks := strings.Split(*key, "/")
- id := ks[len(ks)-1]
- // name
- name := r.Value.Desc
- // host
- host := r.Value.Host
- // path
- path := r.Value.Uri
- // method
- methods := r.Value.Methods
- // upstreamId
- upstreamId := r.Value.UpstreamId
- // serviceId
- serviceId := r.Value.ServiceId
- // plugins
- var plugins v1.Plugins
- plugins = r.Value.Plugins
-
- // fullName
- fullName := "unknown"
- if name != nil {
- fullName = *name
- }
- if group != "" {
- fullName = group + "_" + fullName
- }
-
- return &v1.Route{
- ID: &id,
- Group: &group,
- FullName: &fullName,
- Name: name,
- Host: host,
- Path: path,
- Methods: methods,
- UpstreamId: upstreamId,
- ServiceId: serviceId,
- Plugins: &plugins,
- }, nil
-}
-
-type RoutesResponse struct {
- Routes Routes `json:"node"`
-}
-
-type Routes struct {
- Key string `json:"key"`
- Routes RouteSet `json:"nodes"`
-}
-
-type RouteSet []Route
-
-// RouteSet.UnmarshalJSON implements json.Unmarshaler interface.
-// lua-cjson doesn't distinguish empty array and table,
-// and by default empty array will be encoded as '{}'.
-// We have to maintain the compatibility.
-func (set *RouteSet) UnmarshalJSON(p []byte) error {
- if p[0] == '{' {
- if len(p) != 2 {
- return errors.New("unexpected non-empty object")
- }
- return nil
- }
- var route []Route
- if err := json.Unmarshal(p, &route); err != nil {
- return err
- }
- *set = route
- return nil
-}
-
-type RouteResponse struct {
- Action string `json:"action"`
- Route Route `json:"node"`
-}
-
-type Route struct {
- Key *string `json:"key"` // route key
- Value Value `json:"value"` // route content
-}
-
-type Value struct {
- UpstreamId *string `json:"upstream_id"`
- ServiceId *string `json:"service_id"`
- Plugins map[string]interface{} `json:"plugins"`
- Host *string `json:"host,omitempty"`
- Uri *string `json:"uri"`
- Desc *string `json:"desc"`
- Methods []*string `json:"methods,omitempty"`
-}
-
-type RouteRequest struct {
- Desc string `json:"desc,omitempty"`
- Uri string `json:"uri,omitempty"`
- Host string `json:"host,omitempty"`
- ServiceId string `json:"service_id,omitempty"`
- Plugins *v1.Plugins `json:"plugins,omitempty"`
-}
diff --git a/pkg/seven/apisix/route_test.go b/pkg/seven/apisix/route_test.go
deleted file mode 100644
index 9fba34e..0000000
--- a/pkg/seven/apisix/route_test.go
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package apisix
-
-import (
- "encoding/json"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestRouteUnmarshalJSON(t *testing.T) {
- var route Routes
- emptyData := `
-{
- "key": "test",
- "nodes": {}
-}
-`
- err := json.Unmarshal([]byte(emptyData), &route)
- assert.Nil(t, err)
-
- emptyData = `
-{
- "key": "test",
- "nodes": {"a": "b", "c": "d"}
-}
-`
- err = json.Unmarshal([]byte(emptyData), &route)
- assert.Equal(t, err.Error(), "unexpected non-empty object")
-
- emptyArray := `
-{
- "key": "test",
- "nodes": []
-}
-`
- err = json.Unmarshal([]byte(emptyArray), &route)
- assert.Nil(t, err)
-
- normalData := `
-{
- "key": "test",
- "nodes": [
- {
- "key": "route 1",
- "value": {
- "desc": "test route 1",
- "upstream_id": "123",
- "service_id": "12345",
- "host": "foo.com",
- "uri": "/bar/baz",
- "methods": ["GET", "POST"]
- }
- }
- ]
-}
-`
- err = json.Unmarshal([]byte(normalData), &route)
- assert.Nil(t, err)
- assert.Equal(t, route.Key, "test")
- assert.Equal(t, len(route.Routes), 1)
-
- key := *route.Routes[0].Key
- assert.Equal(t, key, "route 1")
- desc := *route.Routes[0].Value.Desc
- assert.Equal(t, desc, "test route 1")
- upstreamId := *route.Routes[0].Value.UpstreamId
- assert.Equal(t, upstreamId, "123")
- svcId := *route.Routes[0].Value.ServiceId
- assert.Equal(t, svcId, "12345")
- assert.Equal(t, *route.Routes[0].Value.Host, "foo.com")
- assert.Equal(t, *route.Routes[0].Value.Uri, "/bar/baz")
- assert.Equal(t, *route.Routes[0].Value.Methods[0], "GET")
- assert.Equal(t, *route.Routes[0].Value.Methods[1], "POST")
-}
-
-func TestRouteConvertWithoutDesc(t *testing.T) {
- upsId := "1"
- svcId := "2"
- key := "foo/bar"
- r := &Route{
- Key: &key,
- Value: Value{
- UpstreamId: &upsId,
- ServiceId: &svcId,
- Host: nil,
- },
- }
- _, err := r.convert("mygroup")
- assert.Nil(t, err)
-}
diff --git a/pkg/seven/apisix/service.go b/pkg/seven/apisix/service.go
index 7e11222..90f0388 100644
--- a/pkg/seven/apisix/service.go
+++ b/pkg/seven/apisix/service.go
@@ -15,16 +15,13 @@
package apisix
import (
- "encoding/json"
- "errors"
+ "context"
"fmt"
- "strings"
"github.com/golang/glog"
"github.com/api7/ingress-controller/pkg/seven/conf"
sevendb "github.com/api7/ingress-controller/pkg/seven/db"
- "github.com/api7/ingress-controller/pkg/seven/utils"
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
@@ -37,7 +34,7 @@ func FindCurrentService(group, name, fullName string)
(*v1.Service, error) {
return currentService, nil
} else {
// find service from apisix
- if services, err := ListService(group); err != nil {
+ if services, err :=
conf.Client.Cluster(group).Service().List(context.TODO()); err != nil {
glog.Errorf("list services in etcd failed, group: %s,
err: %+v", group, err)
return nil, fmt.Errorf("list services failed, err:
%+v", err)
} else {
@@ -54,162 +51,3 @@ func FindCurrentService(group, name, fullName string)
(*v1.Service, error) {
}
return nil, nil
}
-
-// ListUpstream list upstream from etcd , convert to v1.Upstream
-func ListService(group string) ([]*v1.Service, error) {
- baseUrl := conf.FindUrl(group)
- url := baseUrl + "/services"
- ret, err := Get(url)
- if err != nil {
- return nil, fmt.Errorf("http get failed, url: %s, err: %+v",
url, err)
- }
- var servicesResponse ServicesResponse
- if err := json.Unmarshal(ret, &servicesResponse); err != nil {
- return nil, fmt.Errorf("json unmarshal failed, err: %+v", err)
- } else {
- result := make([]*v1.Service, 0)
- for _, u := range servicesResponse.Services.Services {
- if n, err := u.convert(group); err == nil {
- result = append(result, n)
- } else {
- return nil, fmt.Errorf("service : %+v 转换失败,
%s", u.ServiceValue, err.Error())
- }
- }
- return result, nil
- }
-}
-
-// convert convert Service from etcd to v1.Service
-func (u *Service) convert(group string) (*v1.Service, error) {
- // id
- keys := strings.Split(*u.Key, "/")
- id := keys[len(keys)-1]
- // Name
- name := u.ServiceValue.Desc
- // upstreamId
- upstreamId := u.ServiceValue.UpstreamId
- // plugins
- plugins := &v1.Plugins{}
- for k, v := range u.ServiceValue.Plugins {
- (*plugins)[k] = v
- }
- fullName := *name
- if group != "" {
- fullName = group + "_" + *name
- }
- return &v1.Service{ID: &id, FullName: &fullName, Group: &group, Name:
name, UpstreamId: upstreamId, Plugins: plugins}, nil
-}
-
-func AddService(service *v1.Service) (*ServiceResponse, error) {
- baseUrl := conf.FindUrl(*service.Group)
- url := fmt.Sprintf("%s/services", baseUrl)
- ur := convert2ServiceRequest(service)
- if b, err := json.Marshal(ur); err != nil {
- return nil, err
- } else {
- if res, err := utils.Post(url, b); err != nil {
- return nil, fmt.Errorf("http post failed, err: %+v",
err)
- } else {
- var uRes ServiceResponse
- if err = json.Unmarshal(res, &uRes); err != nil {
- return nil, err
- } else {
- if uRes.Service.Key != nil {
- return &uRes, nil
- } else {
- return nil, fmt.Errorf("apisix service
not expected response")
- }
-
- }
- }
- }
-}
-
-func UpdateService(service *v1.Service) (*ServiceResponse, error) {
- baseUrl := conf.FindUrl(*service.Group)
- url := fmt.Sprintf("%s/services/%s", baseUrl, *service.ID)
- ur := convert2ServiceRequest(service)
- if b, err := json.Marshal(ur); err != nil {
- return nil, err
- } else {
- if res, err := utils.Patch(url, b); err != nil {
- return nil, err
- } else {
- var uRes ServiceResponse
- if err = json.Unmarshal(res, &uRes); err != nil {
- return nil, err
- } else {
- if uRes.Service.Key != nil {
- return &uRes, nil
- } else {
- var errResp ErrorResponse
- json.Unmarshal(res, &errResp)
- glog.Error(errResp.Message)
- return nil, fmt.Errorf("apisix service
not expected response %s", errResp.Message)
- }
- }
- }
- }
-}
-
-func convert2ServiceRequest(service *v1.Service) *ServiceRequest {
- request := &ServiceRequest{
- Desc: service.Name,
- UpstreamId: service.UpstreamId,
- Plugins: service.Plugins,
- }
- glog.V(2).Info(*request.Desc)
- return request
-}
-
-type ServiceRequest struct {
- Desc *string `json:"desc,omitempty"`
- UpstreamId *string `json:"upstream_id"`
- Plugins *v1.Plugins `json:"plugins,omitempty"`
-}
-
-type ServicesResponse struct {
- Services Services `json:"node"`
-}
-
-type Services struct {
- Key string `json:"key"` // 用来定位upstreams 列表
- Services ServiceSet `json:"nodes"`
-}
-
-type ServiceSet []Service
-
-// UpstreamSet.UnmarshalJSON implements json.Unmarshaler interface.
-// lua-cjson doesn't distinguish empty array and table,
-// and by default empty array will be encoded as '{}'.
-// We have to maintain the compatibility.
-func (set *ServiceSet) UnmarshalJSON(p []byte) error {
- if p[0] == '{' {
- if len(p) != 2 {
- return errors.New("unexpected non-empty object")
- }
- return nil
- }
- var svcs []Service
- if err := json.Unmarshal(p, &svcs); err != nil {
- return err
- }
- *set = svcs
- return nil
-}
-
-type ServiceResponse struct {
- Action string `json:"action"`
- Service Service `json:"node"`
-}
-
-type Service struct {
- Key *string `json:"key"` // service key
- ServiceValue ServiceValue `json:"value,omitempty"`
-}
-
-type ServiceValue struct {
- UpstreamId *string `json:"upstream_id,omitempty"`
- Plugins map[string]interface{} `json:"plugins"`
- Desc *string `json:"desc,omitempty"`
-}
diff --git a/pkg/seven/apisix/ssl.go b/pkg/seven/apisix/ssl.go
deleted file mode 100644
index deadbb6..0000000
--- a/pkg/seven/apisix/ssl.go
+++ /dev/null
@@ -1,139 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package apisix
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "strings"
-
- "github.com/golang/glog"
-
- "github.com/api7/ingress-controller/pkg/seven/conf"
- "github.com/api7/ingress-controller/pkg/seven/utils"
- v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
-)
-
-// ListSsl list ssl from etcd , convert to v1.Upstream
-func ListSsl(group string) ([]*v1.Ssl, error) {
- baseUrl := conf.FindUrl(group)
- url := baseUrl + "/ssl"
- ret, err := Get(url)
- if err != nil {
- return nil, fmt.Errorf("http get failed, url: %s, err: %+v",
url, err)
- }
- var sslsResponse SslsResponse
- if err := json.Unmarshal(ret, &sslsResponse); err != nil {
- return nil, fmt.Errorf("json transform error")
- } else {
- ssls := make([]*v1.Ssl, 0)
- for _, s := range sslsResponse.SslList.SslNodes {
- id := strings.ReplaceAll(*s.Key, "/apisix/ssl/", "")
- ssl := &v1.Ssl{
- ID: &id,
- Snis: s.Ssl.Snis,
- Cert: s.Ssl.Cert,
- Key: s.Ssl.Key,
- Status: s.Ssl.Status,
- Group: &group,
- }
- ssls = append(ssls, ssl)
- }
- return ssls, nil
- }
-}
-
-func AddOrUpdateSsl(ssl *v1.Ssl) (*SslResponse, error) {
- baseUrl := conf.FindUrl(*ssl.Group)
- url := fmt.Sprintf("%s/ssl/%s", baseUrl, *ssl.ID)
- glog.V(2).Info(url)
- ur := &v1.Ssl{
- Snis: ssl.Snis,
- Cert: ssl.Cert,
- Key: ssl.Key,
- Status: ssl.Status,
- }
- if b, err := json.Marshal(ur); err != nil {
- return nil, err
- } else {
- if res, err := utils.Put(url, b); err != nil {
- return nil, fmt.Errorf("http put failed, url: %s, err:
%+v", url, err)
- } else {
- var uRes SslResponse
- if err = json.Unmarshal(res, &uRes); err != nil {
- glog.Errorf("json Unmarshal error: %s",
err.Error())
- return nil, err
- } else {
- glog.V(2).Info(uRes)
- if uRes.Ssl.Key != nil {
- return &uRes, nil
- } else {
- return nil, fmt.Errorf("apisix ssl not
expected response")
- }
- }
- }
- }
-}
-
-func DeleteSsl(ssl *v1.Ssl) error {
- baseUrl := conf.FindUrl(*ssl.Group)
- url := fmt.Sprintf("%s/ssl/%s", baseUrl, *ssl.ID)
- if _, err := utils.Delete(url); err != nil {
- return fmt.Errorf("http delete failed, url: %s, err: %+v", url,
err)
- } else {
- return nil
- }
-}
-
-type SslResponse struct {
- Action string `json:"action"`
- Ssl SslNode `json:"node"`
-}
-
-type SslsResponse struct {
- Action string `json:"action"`
- SslList SslList `json:"node"`
-}
-
-type SslList struct {
- SslNodes SslSet `json:"nodes"`
-}
-
-type SslNode struct {
- Key *string `json:"key"`
- Ssl *v1.Ssl `json:"value"`
-}
-
-type SslSet []SslNode
-
-// SslSet.UnmarshalJSON implements json.Unmarshaler interface.
-// lua-cjson doesn't distinguish empty array and table,
-// and by default empty array will be encoded as '{}'.
-// We have to maintain the compatibility.
-func (set *SslSet) UnmarshalJSON(p []byte) error {
- if p[0] == '{' {
- if len(p) != 2 {
- return errors.New("unexpected non-empty object")
- }
- return nil
- }
- var ssls []SslNode
- if err := json.Unmarshal(p, &ssls); err != nil {
- return err
- }
- *set = ssls
- return nil
-}
diff --git a/pkg/seven/apisix/ssl_test.go b/pkg/seven/apisix/ssl_test.go
deleted file mode 100644
index dbb8d30..0000000
--- a/pkg/seven/apisix/ssl_test.go
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package apisix
-
-import (
- "encoding/json"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestSslUnmarshalJSON(t *testing.T) {
- var sslList SslList
- emptyData := `
-{
- "key": "test",
- "nodes": {}
-}
-`
- err := json.Unmarshal([]byte(emptyData), &sslList)
- assert.Nil(t, err)
-
- notEmptyObject := `
-{
- "key": "test",
- "nodes": {"a": "b", "c": "d"}
-}
-`
- err = json.Unmarshal([]byte(notEmptyObject), &sslList)
- assert.Equal(t, err.Error(), "unexpected non-empty object")
-
- emptyArray := `
-{
- "key": "test",
- "nodes": []
-}
-`
- err = json.Unmarshal([]byte(emptyArray), &sslList)
- assert.Nil(t, err)
-
- normalData := `
-{
- "key": "test",
- "nodes": [
- {
- "key": "ssl id",
- "value": {
- "snis": ["test.apisix.org"],
- "cert": "root",
- "key": "123456",
- "status": 1
- }
- }
- ]
-}
-`
- err = json.Unmarshal([]byte(normalData), &sslList)
- assert.Nil(t, err)
- assert.Equal(t, len(sslList.SslNodes), 1)
-
- key := *sslList.SslNodes[0].Key
- assert.Equal(t, key, "ssl id")
- cert := *sslList.SslNodes[0].Ssl.Cert
- assert.Equal(t, cert, "root")
- sslKey := *sslList.SslNodes[0].Ssl.Key
- assert.Equal(t, sslKey, "123456")
- sni := *sslList.SslNodes[0].Ssl.Snis[0]
- assert.Equal(t, sni, "test.apisix.org")
-}
diff --git a/pkg/seven/apisix/upstream.go b/pkg/seven/apisix/upstream.go
index ac0dfdf..65c96cf 100644
--- a/pkg/seven/apisix/upstream.go
+++ b/pkg/seven/apisix/upstream.go
@@ -15,17 +15,13 @@
package apisix
import (
- "encoding/json"
- "errors"
+ "context"
"fmt"
- "strconv"
- "strings"
"github.com/golang/glog"
"github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/db"
- "github.com/api7/ingress-controller/pkg/seven/utils"
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
@@ -38,7 +34,7 @@ func FindCurrentUpstream(group, name, fullName string)
(*v1.Upstream, error) {
return currentUpstream, nil
} else {
// find upstream from apisix
- if upstreams, err := ListUpstream(group); err != nil {
+ if upstreams, err :=
conf.Client.Cluster(group).Upstream().List(context.TODO()); err != nil {
glog.Errorf("list upstreams in etcd failed, group: %s,
err: %+v", group, err)
return nil, fmt.Errorf("list upstreams failed, err:
%+v", err)
} else {
@@ -58,212 +54,17 @@ func FindCurrentUpstream(group, name, fullName string)
(*v1.Upstream, error) {
return nil, nil
}
-// ListUpstream list upstream from etcd , convert to v1.Upstream
-func ListUpstream(group string) ([]*v1.Upstream, error) {
- baseUrl := conf.FindUrl(group)
- url := baseUrl + "/upstreams"
- ret, err := Get(url)
- if err != nil {
- return nil, fmt.Errorf("http get failed, url: %s, err: %+v",
url, err)
- }
- var upstreamsResponse UpstreamsResponse
- if err := json.Unmarshal(ret, &upstreamsResponse); err != nil {
- return nil, fmt.Errorf("json转换失败")
- } else {
- upstreams := make([]*v1.Upstream, 0)
- for _, u := range upstreamsResponse.Upstreams.Upstreams {
- if n, err := u.convert(group); err == nil {
- upstreams = append(upstreams, n)
- } else {
- return nil, fmt.Errorf("upstream: %s 转换失败, %s",
*u.UpstreamNodes.Desc, err.Error())
- }
- }
- return upstreams, nil
- }
-}
-
-//func IsExist(name string) (bool, error) {
-// if upstreams, err := ListUpstream(); err != nil {
-// return false, err
-// } else {
-// for _, upstream := range upstreams {
-// if *upstream.Name == name {
-// return true, nil
-// }
-// }
-// return false, nil
-// }
-//}
-
-func AddUpstream(upstream *v1.Upstream) (*UpstreamResponse, error) {
- baseUrl := conf.FindUrl(*upstream.Group)
- url := fmt.Sprintf("%s/upstreams", baseUrl)
- glog.V(2).Info(url)
- ur := convert2UpstreamRequest(upstream)
- if b, err := json.Marshal(ur); err != nil {
- return nil, err
- } else {
- if res, err := utils.Post(url, b); err != nil {
- return nil, fmt.Errorf("http post failed, url: %s, err:
%+v", url, err)
- } else {
- var uRes UpstreamResponse
- if err = json.Unmarshal(res, &uRes); err != nil {
- glog.Errorf("json Unmarshal error: %s",
err.Error())
- return nil, err
- } else {
- glog.V(2).Info(uRes)
- if uRes.Upstream.Key != nil {
- return &uRes, nil
- } else {
- return nil, fmt.Errorf("apisix upstream
not expected response")
- }
- }
- }
- }
-}
-
-func UpdateUpstream(upstream *v1.Upstream) error {
- baseUrl := conf.FindUrl(*upstream.Group)
- url := fmt.Sprintf("%s/upstreams/%s", baseUrl, *upstream.ID)
- ur := convert2UpstreamRequest(upstream)
- if b, err := json.Marshal(ur); err != nil {
- return err
- } else {
- if _, err := utils.Patch(url, b); err != nil {
- return fmt.Errorf("http patch failed, url: %s, err:
%+v", url, err)
- } else {
- return nil
- }
- }
-}
-
func PatchNodes(upstream *v1.Upstream, nodes []*v1.Node) error {
- baseUrl := conf.FindUrl(*upstream.Group)
- url := fmt.Sprintf("%s/upstreams/%s/nodes", baseUrl, *upstream.ID)
- nodeMap := convertNodes(nodes)
- if b, err := json.Marshal(nodeMap); err != nil {
- return err
- } else {
- if _, err := utils.Patch(url, b); err != nil {
- return fmt.Errorf("http patch failed, url: %s, err:
%+v", url, err)
- } else {
- return nil
- }
- }
-}
-
-func DeleteUpstream(upstream *v1.Upstream) error {
- baseUrl := conf.FindUrl(*upstream.Group)
- url := fmt.Sprintf("%s/upstreams/%s", baseUrl, *upstream.ID)
- if _, err := utils.Delete(url); err != nil {
- return fmt.Errorf("http delete failed, url: %s, err: %+v", url,
err)
- } else {
- return nil
+ oldNodes := upstream.Nodes
+ upstream.Nodes = nodes
+ defer func() {
+ // Restore it
+ upstream.Nodes = oldNodes
+ }()
+ var cluster string
+ if upstream.Group != nil {
+ cluster = *upstream.Group
}
-}
-
-func convert2UpstreamRequest(upstream *v1.Upstream) *UpstreamRequest {
- nodes := convertNodes(upstream.Nodes)
- return &UpstreamRequest{
- LBType: *upstream.Type,
- HashOn: upstream.HashOn,
- Key: upstream.Key,
- Desc: *upstream.Name,
- Nodes: nodes,
- }
-}
-
-func convertNodes(nodes []*v1.Node) map[string]int64 {
- result := make(map[string]int64)
- for _, u := range nodes {
- result[*u.IP+":"+strconv.Itoa(*u.Port)] = int64(*u.Weight)
- }
- return result
-}
-
-// convert convert Upstream from etcd to v1.Upstream
-func (u *Upstream) convert(group string) (*v1.Upstream, error) {
- // id
- keys := strings.Split(*u.Key, "/")
- id := keys[len(keys)-1]
- // Name
- name := u.UpstreamNodes.Desc
- // type
- LBType := u.UpstreamNodes.LBType
- // key
- key := u.Key
- // nodes
- nodes := make([]*v1.Node, 0)
- for k, v := range u.UpstreamNodes.Nodes {
- ks := strings.Split(k, ":")
- ip := ks[0]
- port := 80
- if len(ks) > 1 {
- port, _ = strconv.Atoi(ks[1])
- }
- weight := int(v)
- node := &v1.Node{IP: &ip, Port: &port, Weight: &weight}
- nodes = append(nodes, node)
- }
- // fullName
- fullName := *name
- if group != "" {
- fullName = group + "_" + *name
- }
- return &v1.Upstream{ID: &id, FullName: &fullName, Group: &group, Name:
name, Type: LBType, Key: key, Nodes: nodes}, nil
-}
-
-type UpstreamsResponse struct {
- Upstreams Upstreams `json:"node"`
-}
-
-type UpstreamResponse struct {
- Action string `json:"action"`
- Upstream Upstream `json:"node"`
-}
-
-type Upstreams struct {
- Key string `json:"key"` // 用来定位upstreams 列表
- Upstreams UpstreamSet `json:"nodes"`
-}
-
-type UpstreamSet []Upstream
-
-// UpstreamSet.UnmarshalJSON implements json.Unmarshaler interface.
-// lua-cjson doesn't distinguish empty array and table,
-// and by default empty array will be encoded as '{}'.
-// We have to maintain the compatibility.
-func (set *UpstreamSet) UnmarshalJSON(p []byte) error {
- if p[0] == '{' {
- if len(p) != 2 {
- return errors.New("unexpected non-empty object")
- }
- return nil
- }
- var ups []Upstream
- if err := json.Unmarshal(p, &ups); err != nil {
- return err
- }
- *set = ups
- return nil
-}
-
-type Upstream struct {
- Key *string `json:"key"` // upstream key
- UpstreamNodes UpstreamNodes `json:"value"`
-}
-
-type UpstreamNodes struct {
- Nodes map[string]int64 `json:"nodes"`
- Desc *string `json:"desc"` // upstream name = k8s svc
- LBType *string `json:"type"` // 负载均衡类型
-}
-
-//{"type":"roundrobin","nodes":{"10.244.10.11:8080":100},"desc":"somesvc"}
-type UpstreamRequest struct {
- LBType string `json:"type"`
- HashOn *string `json:"hash_on,omitempty"`
- Key *string `json:"key,omitempty"`
- Nodes map[string]int64 `json:"nodes"`
- Desc string `json:"desc"`
+ _, err :=
conf.Client.Cluster(cluster).Upstream().Update(context.TODO(), upstream)
+ return err
}
diff --git a/pkg/seven/apisix/upstream_test.go
b/pkg/seven/apisix/upstream_test.go
deleted file mode 100644
index 1aee995..0000000
--- a/pkg/seven/apisix/upstream_test.go
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package apisix
-
-import (
- "encoding/json"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestUpstreamsUnmarshalJSON(t *testing.T) {
- var ups Upstreams
- emptyData := `
-{
- "key": "test",
- "nodes": {}
-}
-`
- err := json.Unmarshal([]byte(emptyData), &ups)
- assert.Nil(t, err)
-
- emptyData = `
-{
- "key": "test",
- "nodes": {"a": "b", "c": "d"}
-}
-`
- err = json.Unmarshal([]byte(emptyData), &ups)
- assert.Equal(t, err.Error(), "unexpected non-empty object")
-
- emptyArray := `
-{
- "key": "test",
- "nodes": []
-}
-`
- err = json.Unmarshal([]byte(emptyArray), &ups)
- assert.Nil(t, err)
-
- normalData := `
-{
- "key": "test",
- "nodes": [
- {
- "key": "ups1",
- "value": {
- "desc": "test upstream 1",
- "type": "rr",
- "nodes": {
- "192.168.12.12": 100
- }
- }
- }
- ]
-}
-`
- err = json.Unmarshal([]byte(normalData), &ups)
- assert.Nil(t, err)
- assert.Equal(t, ups.Key, "test")
- assert.Equal(t, len(ups.Upstreams), 1)
-
- key := *ups.Upstreams[0].Key
- assert.Equal(t, key, "ups1")
- desc := *ups.Upstreams[0].UpstreamNodes.Desc
- assert.Equal(t, desc, "test upstream 1")
- lb := *ups.Upstreams[0].UpstreamNodes.LBType
- assert.Equal(t, lb, "rr")
-
- assert.Equal(t, len(ups.Upstreams[0].UpstreamNodes.Nodes), 1)
- assert.Equal(t, ups.Upstreams[0].UpstreamNodes.Nodes["192.168.12.12"],
int64(100))
-}
diff --git a/pkg/seven/conf/conf.go b/pkg/seven/conf/conf.go
index 1f016c6..7b6d3d9 100644
--- a/pkg/seven/conf/conf.go
+++ b/pkg/seven/conf/conf.go
@@ -14,8 +14,16 @@
// limitations under the License.
package conf
-var BaseUrl = "http://172.16.20.90:30116/apisix/admin"
-var UrlGroup = make(map[string]string)
+import (
+ "github.com/api7/ingress-controller/pkg/apisix"
+ "github.com/api7/ingress-controller/pkg/log"
+)
+
+var (
+ BaseUrl = "http://172.16.20.90:30116/apisix/admin"
+ UrlGroup = make(map[string]string)
+ Client apisix.APISIX
+)
func SetBaseUrl(url string) {
BaseUrl = url
@@ -23,14 +31,20 @@ func SetBaseUrl(url string) {
func AddGroup(group string) {
if group != "" {
- UrlGroup[group] = "http://" + group + "/apisix/admin"
+ err := Client.AddCluster(&apisix.ClusterOptions{
+ Name: group,
+ BaseURL: "http://" + group + "/apisix/admin",
+ })
+ if err != nil {
+ if err == apisix.ErrDuplicatedCluster {
+ log.Errorf("failed to create cluster %s: %s",
group, err)
+ } else {
+ log.Infof("cluster %s already exists", group)
+ }
+ }
}
}
-func FindUrl(group string) string {
- if group != "" && UrlGroup[group] != "" {
- return UrlGroup[group]
- } else {
- return BaseUrl
- }
+func SetAPISIXClient(c apisix.APISIX) {
+ Client = c
}
diff --git a/pkg/seven/state/builder.go b/pkg/seven/state/builder.go
index 764bde4..0a90291 100644
--- a/pkg/seven/state/builder.go
+++ b/pkg/seven/state/builder.go
@@ -18,11 +18,13 @@ import (
"context"
"errors"
"strconv"
- "strings"
"sync"
+ "github.com/golang/glog"
+
"github.com/api7/ingress-controller/pkg/log"
"github.com/api7/ingress-controller/pkg/seven/apisix"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/db"
"github.com/api7/ingress-controller/pkg/seven/utils"
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
@@ -137,6 +139,10 @@ func (r *routeWorker) trigger(event Event) {
// sync
func (r *routeWorker) sync() error {
+ var cluster string
+ if r.Group != nil {
+ cluster = *r.Group
+ }
if *r.Route.ID != strconv.Itoa(0) {
// 1. sync memDB
db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
@@ -145,27 +151,26 @@ func (r *routeWorker) sync() error {
return err
}
// 2. sync apisix
- if err := apisix.UpdateRoute(r.Route); err != nil {
+ if _, err :=
conf.Client.Cluster(cluster).Route().Update(context.TODO(), r.Route); err !=
nil {
+ log.Errorf("failed to update route %s: %s, ", *r.Name,
err)
return err
}
log.Infof("update route %s, %s", *r.Name, *r.ServiceId)
} else {
// 1. sync apisix and get id
- if res, err := apisix.AddRoute(r.Route); err != nil {
- log.Errorf("add route failed, route: %#v, err: %+v",
r.Route, err)
- return err
- } else {
- key := res.Route.Key
- tmp := strings.Split(*key, "/")
- *r.ID = tmp[len(tmp)-1]
- }
- // 2. sync memDB
- db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
- if err := db.Insert(); err != nil {
+ route, err :=
conf.Client.Cluster(cluster).Route().Create(context.TODO(), r.Route)
+ if err != nil {
+ log.Errorf("failed to create route: %s", err.Error())
return err
}
- log.Infof("create route %s, %s", *r.Name, *r.ServiceId)
+ *r.ID = *route.ID
}
+ // 2. sync memDB
+ db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
+ if err := db.Insert(); err != nil {
+ return err
+ }
+ log.Infof("create route %s, %s", *r.Name, *r.ServiceId)
return nil
}
@@ -230,10 +235,14 @@ func SolverSingleUpstream(u *v1.Upstream, swg
ServiceWorkerGroup, wg *sync.WaitG
errNotify = err
return
}
+
// 2.sync apisix
- if err = apisix.UpdateUpstream(u); err
!= nil {
- log.Errorf("solver upstream
failed, update upstream to etcd failed, err: %+v", err)
- errNotify = err
+ var cluster string
+ if u.Group != nil {
+ cluster = *u.Group
+ }
+ if _, err =
conf.Client.Cluster(cluster).Upstream().Update(context.TODO(), u); err != nil {
+ glog.Errorf("solver upstream
failed, update upstream to etcd failed, err: %+v", err)
return
}
}
@@ -261,14 +270,17 @@ func SolverSingleUpstream(u *v1.Upstream, swg
ServiceWorkerGroup, wg *sync.WaitG
} else {
op = Create
// 1.sync apisix and get response
- if upstreamResponse, err :=
apisix.AddUpstream(u); err != nil {
- log.Errorf("solver upstream failed,
update upstream to etcd failed, err: %+v", err)
- errNotify = err
+ var cluster string
+ if u.Group != nil {
+ cluster = *u.Group
+ }
+ ups, err :=
conf.Client.Cluster(cluster).Upstream().Create(context.TODO(), u)
+ if err != nil {
+ log.Errorf("failed to create upstream:
%s", err)
return
- } else {
- tmp :=
strings.Split(*upstreamResponse.Upstream.Key, "/")
- *u.ID = tmp[len(tmp)-1]
}
+
+ *u.ID = *ups.ID
// 2.sync memDB
//apisix.InsertUpstreams([]*v1.Upstream{u})
upstreamDB := &db.UpstreamDB{Upstreams:
[]*v1.Upstream{u}}
diff --git a/pkg/seven/state/service_worker.go
b/pkg/seven/state/service_worker.go
index b5b5830..d1fcce3 100644
--- a/pkg/seven/state/service_worker.go
+++ b/pkg/seven/state/service_worker.go
@@ -17,11 +17,11 @@ package state
import (
"context"
"strconv"
- "strings"
"sync"
"github.com/api7/ingress-controller/pkg/log"
"github.com/api7/ingress-controller/pkg/seven/apisix"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/db"
"github.com/api7/ingress-controller/pkg/seven/utils"
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
@@ -98,17 +98,20 @@ func SolverSingleService(svc *v1.Service, rwg
RouteWorkerGroup, wg *sync.WaitGro
errNotify = err
return
}
+ var cluster string
+ if svc.Group != nil {
+ cluster = *svc.Group
+ }
if hasDiff {
if *svc.ID == strconv.Itoa(0) {
op = Create
// 1. sync apisix and get id
- if serviceResponse, err := apisix.AddService(svc); err
!= nil {
- log.Info(err.Error())
+ if s, err :=
conf.Client.Cluster(cluster).Service().Create(context.TODO(), svc); err != nil {
+ log.Errorf("failed to create service: %s", err)
errNotify = err
return
} else {
- tmp :=
strings.Split(*serviceResponse.Service.Key, "/")
- *svc.ID = tmp[len(tmp)-1]
+ *svc.ID = *s.ID
}
// 2. sync memDB
db := &db.ServiceDB{Services: []*v1.Service{svc}}
@@ -131,23 +134,22 @@ func SolverSingleService(svc *v1.Service, rwg
RouteWorkerGroup, wg *sync.WaitGro
// 1. sync memDB
db := db.ServiceDB{Services: []*v1.Service{svc}}
if err := db.UpdateService(); err != nil {
- // todo log error
+ log.Errorf("failed to update service to
mem db: %s", err)
errNotify = err
return
}
// 2. sync apisix
- if _, err := apisix.UpdateService(svc); err !=
nil {
+ if _, err :=
conf.Client.Cluster(cluster).Service().Update(context.TODO(), svc); err != nil {
errNotify = err
- return
+ log.Errorf("failed to update service:
%s, id:%s", err, *svc.ID)
+ } else {
+ log.Infof("updated service, id:%s,
upstream_id:%s", *svc.ID, *svc.UpstreamId)
}
- log.Infof("update service %s, %s", *svc.Name,
*svc.UpstreamId)
}
-
}
}
// broadcast to route
- routeWorkers := rwg[*svc.Name]
- for _, rw := range routeWorkers {
+ for _, rw := range rwg[*svc.Name] {
event := &Event{Kind: ServiceKind, Op: op, Obj: svc}
log.Infof("send event %s, %s, %s", event.Kind, event.Op,
*svc.Name)
rw.Event <- *event
diff --git a/pkg/seven/state/solver.go b/pkg/seven/state/solver.go
index abbf609..284ea89 100644
--- a/pkg/seven/state/solver.go
+++ b/pkg/seven/state/solver.go
@@ -20,9 +20,10 @@ import (
"sync"
"time"
- "github.com/api7/ingress-controller/pkg/seven/apisix"
+ "github.com/api7/ingress-controller/pkg/log"
+ "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/db"
- "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
var UpstreamQueue chan UpstreamQueueObj
@@ -135,9 +136,15 @@ func (rc *RouteCompare) Sync() error {
request := db.RouteRequest{Name: *old.Name, FullName:
fullName}
if route, err := request.FindByName(); err != nil {
- // log error
+ log.Errorf("failed to find route %s from memory
DB: %s", *old.Name, err)
} else {
- if err = apisix.DeleteRoute(route); err == nil {
+ var cluster string
+ if route.Group != nil {
+ cluster = *route.Group
+ }
+ if err :=
conf.Client.Cluster(cluster).Route().Delete(context.TODO(), route); err != nil {
+ log.Errorf("failed to delete route %s
from APISIX: %s", *route.Name, err)
+ } else {
db := db.RouteDB{Routes:
[]*v1.Route{route}}
db.DeleteRoute()
}
@@ -148,16 +155,19 @@ func (rc *RouteCompare) Sync() error {
}
func SyncSsl(ssl *v1.Ssl, method string) error {
+ var cluster string
+ if ssl.Group != nil {
+ cluster = *ssl.Group
+ }
switch method {
case Create:
- _, err := apisix.AddOrUpdateSsl(ssl)
+ _, err :=
conf.Client.Cluster(cluster).SSL().Create(context.TODO(), ssl)
return err
case Update:
- _, err := apisix.AddOrUpdateSsl(ssl)
+ _, err :=
conf.Client.Cluster(cluster).SSL().Update(context.TODO(), ssl)
return err
case Delete:
- err := apisix.DeleteSsl(ssl)
- return err
+ return
conf.Client.Cluster(cluster).SSL().Delete(context.TODO(), ssl)
}
return nil
}
diff --git a/test/e2e/go.mod b/test/e2e/go.mod
index f396462..33df3d4 100644
--- a/test/e2e/go.mod
+++ b/test/e2e/go.mod
@@ -13,3 +13,5 @@ require (
)
replace github.com/gxthrj/apisix-ingress-types v0.1.3 =>
github.com/api7/ingress-types v0.1.3
+
+replace github.com/api7/ingress-controller => ../../
diff --git a/test/e2e/go.sum b/test/e2e/go.sum
index 715bf34..58d4c63 100644
--- a/test/e2e/go.sum
+++ b/test/e2e/go.sum
@@ -65,6 +65,7 @@ github.com/alecthomas/template
v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/api7/ingress-controller v0.0.0-20210105024109-72e53386de5a
h1:oEE3iY5nAw/mO8FBUtjlT6dj3BJ1XKuCYSrFCQa5wUY=
github.com/api7/ingress-controller v0.0.0-20210105024109-72e53386de5a/go.mod
h1:XPDSWSta4MVXvigaiAfVKBb/EUJsiwz3nR3Z1GuWCEE=
+github.com/api7/ingress-controller v0.1.0-rc1
h1:6EjrBu0r+ccVfYTnpGYj1txz1DJCJ/Q/k8pHigRkeu0=
github.com/api7/ingress-types v0.1.3/go.mod
h1:xWuHLSHGN4/JZjz9b0ftKgtE3yZ7NehkJAiTyZ+KoPA=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod
h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
@@ -530,11 +531,14 @@ go.opencensus.io v0.22.0/go.mod
h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod
h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.1.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod
h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod
h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -604,6 +608,8 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
h1:AeiKBIuRw3UomYXSbLy0Mc2dD
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200707034311-ab3426394381
h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod
h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20201224014010-6772e930b67b
h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw=
+golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -654,6 +660,9 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299
h1:DYfZAGf2WMFjMxbgTjaC+2HC7
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200622214017-ed371f2e16b4
h1:5/PjkGUjvEU5Gl6BxmvKRPpqo2uNMv4rcHBMwzk/st8=
golang.org/x/sys v0.0.0-20200622214017-ed371f2e16b4/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/test/e2e/ingress/route.go b/test/e2e/ingress/route.go
index 06139e4..8b88ebf 100644
--- a/test/e2e/ingress/route.go
+++ b/test/e2e/ingress/route.go
@@ -50,9 +50,9 @@ spec:
scale := 2
s.ScaleHTTPBIN(scale)
s.WaitUntilNumPodsCreatedE(s.Selector("app=httpbin-deployment-e2e-test"),
scale, 5, 5*time.Second)
- time.Sleep(2 * time.Second) // wait for ingress to sync
- response, err := s.ListApisixUpstreams()
- assert.Nil(ginkgo.GinkgoT(), err, "List upstreams error")
- assert.Equal(ginkgo.GinkgoT(), 2,
len(response.Upstreams.Upstreams[0].UpstreamNodes.Nodes), "upstreams nodes not
expect")
+ time.Sleep(10 * time.Second) // wait for ingress to sync
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
+ assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2, "upstreams nodes
not expect")
})
})
diff --git a/test/e2e/scaffold/apisix.go b/test/e2e/scaffold/apisix.go
index 194a49e..37c504b 100644
--- a/test/e2e/scaffold/apisix.go
+++ b/test/e2e/scaffold/apisix.go
@@ -77,7 +77,7 @@ spec:
tcpSocket:
port: 9080
timeoutSeconds: 2
- image: "apache/apisix:latest"
+ image: "apache/apisix:dev"
imagePullPolicy: IfNotPresent
name: apisix-deployment-e2e-test
ports:
diff --git a/test/e2e/scaffold/crd.go b/test/e2e/scaffold/crd.go
index 7dbbe0b..1d3d887 100644
--- a/test/e2e/scaffold/crd.go
+++ b/test/e2e/scaffold/crd.go
@@ -15,18 +15,21 @@
package scaffold
import (
+ "context"
"encoding/json"
"net/http"
"net/url"
"strconv"
"time"
- "github.com/api7/ingress-controller/pkg/seven/apisix"
"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
+
+ "github.com/api7/ingress-controller/pkg/apisix"
+ v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
type counter struct {
@@ -155,7 +158,7 @@ func (s *Scaffold) EnsureNumApisixUpstreamsCreated(desired
int) error {
}
// ListApisixUpstreams list all upstream from APISIX
-func (s *Scaffold) ListApisixUpstreams() (*apisix.UpstreamsResponse, error) {
+func (s *Scaffold) ListApisixUpstreams() ([]*v1.Upstream, error) {
host, err := s.apisixAdminServiceURL()
if err != nil {
return nil, err
@@ -163,13 +166,13 @@ func (s *Scaffold) ListApisixUpstreams()
(*apisix.UpstreamsResponse, error) {
u := url.URL{
Scheme: "http",
Host: host,
- Path: "/apisix/admin/upstreams",
+ Path: "/apisix/admin",
}
- resp, err := http.Get(u.String())
- var responses *apisix.UpstreamsResponse
- dec := json.NewDecoder(resp.Body)
- if err := dec.Decode(&responses); err != nil {
+ cli, err := apisix.NewForOptions(&apisix.ClusterOptions{
+ BaseURL: u.String(),
+ })
+ if err != nil {
return nil, err
}
- return responses, nil
+ return cli.Cluster("").Upstream().List(context.TODO())
}