This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new d9d22bb chore: generate id instead let APISIX do it (#199)
d9d22bb is described below
commit d9d22bb885f9a6dff6a4c2f74d9c5af2320d7423
Author: Alex Zhang <[email protected]>
AuthorDate: Mon Jan 25 15:23:02 2021 +0800
chore: generate id instead let APISIX do it (#199)
---
Makefile | 3 ++
pkg/apisix/cluster.go | 31 ++++++++++++-
pkg/apisix/resource.go | 4 ++
pkg/apisix/route.go | 47 +++++++++++++++----
pkg/apisix/route_test.go | 13 +++---
pkg/apisix/service.go | 45 ++++++++++++++----
pkg/apisix/service_test.go | 12 +++--
pkg/apisix/ssl.go | 61 ++++++++++++++++++++++--
pkg/apisix/ssl_test.go | 13 ++++--
pkg/apisix/upstream.go | 45 ++++++++++++++----
pkg/apisix/upstream_test.go | 12 +++--
pkg/seven/state/builder.go | 95 ++++++++++++++++++--------------------
pkg/seven/state/diff.go | 15 ------
pkg/seven/state/service_worker.go | 13 ++----
pkg/seven/state/solver.go | 6 +--
pkg/seven/state/sync.go | 15 ------
pkg/seven/state/upstream_worker.go | 25 ----------
17 files changed, 287 insertions(+), 168 deletions(-)
diff --git a/Makefile b/Makefile
index a2c1411..1d01c1b 100644
--- a/Makefile
+++ b/Makefile
@@ -34,6 +34,7 @@
GITSHASYM="github.com/api7/ingress-controller/pkg/version._buildGitRevision"
BUILDOSSYM="github.com/api7/ingress-controller/pkg/version._buildOS"
GO_LDFLAGS ?= "-X=$(VERSYM)=$(VERSION) -X=$(GITSHASYM)=$(GITSHA)
-X=$(BUILDOSSYM)=$(OSNAME)/$(OSARCH)"
E2E_CONCURRENCY ?= 1
+E2E_SKIP_BUILD ?= 0
### build: Build apisix-ingress-controller
build:
@@ -75,9 +76,11 @@ endif
# build images to minikube node directly, it's an internal directive, so don't
# expose it's help message.
build-image-to-minikube:
+ifeq ($(E2E_SKIP_BUILD), 0)
@minikube version > /dev/null 2>&1 || (echo "ERROR: minikube is
required."; exit 1)
@eval $$(minikube docker-env);\
docker build -t apache/apisix-ingress-controller:$(IMAGE_TAG) .
+endif
### license-check: Do Apache License Header check
license-check:
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 4a52412..b23e34f 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -255,6 +255,35 @@ func (s *cluster) do(req *http.Request) (*http.Response,
error) {
return s.cli.Do(req)
}
+func (s *cluster) getResource(ctx context.Context, url string) (*getResponse,
error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := s.do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer drainBody(resp.Body, url)
+ if resp.StatusCode != http.StatusOK {
+ if resp.StatusCode == http.StatusNotFound {
+ return nil, cache.ErrNotFound
+ } else {
+ err = multierr.Append(err, fmt.Errorf("unexpected
status code %d", resp.StatusCode))
+ err = multierr.Append(err, fmt.Errorf("error message:
%s", readBody(resp.Body, url)))
+ }
+ return nil, err
+ }
+
+ var res getResponse
+
+ dec := json.NewDecoder(resp.Body)
+ if err := dec.Decode(&res); err != nil {
+ return nil, err
+ }
+ return &res, nil
+}
+
func (s *cluster) listResource(ctx context.Context, url string)
(*listResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -281,7 +310,7 @@ func (s *cluster) listResource(ctx context.Context, url
string) (*listResponse,
}
func (s *cluster) createResource(ctx context.Context, url string, body
io.Reader) (*createResponse, error) {
- req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
+ req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return nil, err
}
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index 1c7ecc9..e411bf5 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -24,6 +24,10 @@ import (
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
+type getResponse struct {
+ Item item `json:"node"`
+}
+
// listResponse is the unified LIST response mapping of APISIX.
type listResponse struct {
Count string `json:"count"`
diff --git a/pkg/apisix/route.go b/pkg/apisix/route.go
index 51d9c9e..246f869 100644
--- a/pkg/apisix/route.go
+++ b/pkg/apisix/route.go
@@ -23,6 +23,7 @@ import (
"go.uber.org/zap"
"github.com/api7/ingress-controller/pkg/apisix/cache"
+ "github.com/api7/ingress-controller/pkg/id"
"github.com/api7/ingress-controller/pkg/log"
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
@@ -49,6 +50,8 @@ func newRouteClient(c *cluster) Route {
}
}
+// FIXME, currently if caller pass a non-existent resource, the Get always
passes
+// through cache.
func (r *routeClient) Get(ctx context.Context, fullname string) (*v1.Route,
error) {
log.Infow("try to look up route",
zap.String("fullname", fullname),
@@ -71,17 +74,42 @@ func (r *routeClient) Get(ctx context.Context, fullname
string) (*v1.Route, erro
)
}
- // FIXME Replace the List with accurate get since list is not trivial.
- list, err := r.List(ctx)
+ // TODO Add mutex here to avoid dog-pile effection.
+ url := r.url + "/" + id.GenID(fullname)
+ resp, err := r.cluster.getResource(ctx, url)
if err != nil {
+ if err == cache.ErrNotFound {
+ log.Warnw("route not found",
+ zap.String("fullname", fullname),
+ zap.String("url", url),
+ zap.String("cluster", r.clusterName),
+ )
+ } else {
+ log.Errorw("failed to get route from APISIX",
+ zap.String("fullname", fullname),
+ zap.String("url", url),
+ zap.String("cluster", r.clusterName),
+ zap.Error(err),
+ )
+ }
return nil, err
}
- for _, elem := range list {
- if *elem.FullName == fullname {
- return elem, nil
- }
+
+ route, err = resp.Item.route(r.clusterName)
+ if err != nil {
+ log.Errorw("failed to convert route item",
+ zap.String("url", r.url),
+ zap.String("route_key", resp.Item.Key),
+ zap.Error(err),
+ )
+ return nil, err
+ }
+
+ if err := r.cluster.cache.InsertRoute(route); err != nil {
+ log.Errorf("failed to reflect route create to cache: %s", err)
+ return nil, err
}
- return nil, cache.ErrNotFound
+ return route, nil
}
// List is only used in cache warming up. So here just pass through
@@ -139,8 +167,9 @@ func (r *routeClient) Create(ctx context.Context, obj
*v1.Route) (*v1.Route, err
return nil, err
}
- log.Infow("creating route", zap.ByteString("body", data),
zap.String("url", r.url))
- resp, err := r.cluster.createResource(ctx, r.url, bytes.NewReader(data))
+ url := r.url + "/" + *obj.ID
+ log.Infow("creating route", zap.ByteString("body", data),
zap.String("url", url))
+ resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
if err != nil {
log.Errorf("failed to create route: %s", err)
return nil, err
diff --git a/pkg/apisix/route_test.go b/pkg/apisix/route_test.go
index c49b9c6..a06a209 100644
--- a/pkg/apisix/route_test.go
+++ b/pkg/apisix/route_test.go
@@ -34,7 +34,6 @@ import (
)
type fakeAPISIXRouteSrv struct {
- id int
route map[string]json.RawMessage
}
@@ -101,9 +100,9 @@ func (srv *fakeAPISIXRouteSrv) ServeHTTP(w
http.ResponseWriter, r *http.Request)
w.WriteHeader(code)
}
- if r.Method == http.MethodPost {
- srv.id++
- key := fmt.Sprintf("/apisix/routes/%d", srv.id)
+ if r.Method == http.MethodPut {
+ paths := strings.Split(r.URL.Path, "/")
+ key := fmt.Sprintf("/apisix/routes/%s", paths[len(paths)-1])
data, _ := ioutil.ReadAll(r.Body)
srv.route[key] = data
w.WriteHeader(http.StatusCreated)
@@ -139,7 +138,6 @@ func (srv *fakeAPISIXRouteSrv) ServeHTTP(w
http.ResponseWriter, r *http.Request)
func runFakeRouteSrv(t *testing.T) *http.Server {
srv := &fakeAPISIXRouteSrv{
- id: 0,
route: make(map[string]json.RawMessage),
}
@@ -181,11 +179,12 @@ func TestRouteClient(t *testing.T) {
})
// Create
- id := "111"
+ id := "1"
host := "www.foo.com"
uri := "/bar"
name := "test"
obj, err := cli.Create(context.Background(), &v1.Route{
+ ID: &id,
Host: &host,
Path: &uri,
Name: &name,
@@ -196,7 +195,9 @@ func TestRouteClient(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, *obj.ID, "1")
+ id2 := "2"
obj, err = cli.Create(context.Background(), &v1.Route{
+ ID: &id2,
Host: &host,
Path: &uri,
Name: &name,
diff --git a/pkg/apisix/service.go b/pkg/apisix/service.go
index a58f099..8d77eb3 100644
--- a/pkg/apisix/service.go
+++ b/pkg/apisix/service.go
@@ -23,6 +23,7 @@ import (
"go.uber.org/zap"
"github.com/api7/ingress-controller/pkg/apisix/cache"
+ "github.com/api7/ingress-controller/pkg/id"
"github.com/api7/ingress-controller/pkg/log"
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
@@ -69,17 +70,42 @@ func (s *serviceClient) Get(ctx context.Context, fullname
string) (*v1.Service,
)
}
- // FIXME Replace the List with accurate get since list is not trivial.
- list, err := s.List(ctx)
+ // TODO Add mutex here to avoid dog-pile effection.
+ url := s.url + "/" + id.GenID(fullname)
+ resp, err := s.cluster.getResource(ctx, url)
if err != nil {
+ if err == cache.ErrNotFound {
+ log.Warnw("service not found",
+ zap.String("fullname", fullname),
+ zap.String("url", url),
+ zap.String("cluster", s.clusterName),
+ )
+ } else {
+ log.Errorw("failed to get service from APISIX",
+ zap.String("fullname", fullname),
+ zap.String("url", url),
+ zap.String("cluster", s.clusterName),
+ zap.Error(err),
+ )
+ }
return nil, err
}
- for _, elem := range list {
- if *elem.FullName == fullname {
- return elem, nil
- }
+
+ svc, err = resp.Item.service(s.clusterName)
+ if err != nil {
+ log.Errorw("failed to convert service item",
+ zap.String("url", s.url),
+ zap.String("service_key", resp.Item.Key),
+ zap.Error(err),
+ )
+ return nil, err
}
- return nil, cache.ErrNotFound
+
+ if err := s.cluster.cache.InsertService(svc); err != nil {
+ log.Errorf("failed to reflect service create to cache: %s", err)
+ return nil, err
+ }
+ return svc, nil
}
// List is only used in cache warming up. So here just pass through
@@ -132,8 +158,9 @@ func (s *serviceClient) Create(ctx context.Context, obj
*v1.Service) (*v1.Servic
return nil, err
}
- log.Infow("creating service", zap.ByteString("body", body),
zap.String("url", s.url))
- resp, err := s.cluster.createResource(ctx, s.url, bytes.NewReader(body))
+ url := s.url + "/" + *obj.ID
+ log.Infow("creating service", zap.ByteString("body", body),
zap.String("url", url))
+ resp, err := s.cluster.createResource(ctx, url, bytes.NewReader(body))
if err != nil {
log.Errorf("failed to create service: %s", err)
return nil, err
diff --git a/pkg/apisix/service_test.go b/pkg/apisix/service_test.go
index a7054e5..41c8078 100644
--- a/pkg/apisix/service_test.go
+++ b/pkg/apisix/service_test.go
@@ -33,7 +33,6 @@ import (
)
type fakeAPISIXServiceSrv struct {
- id int
service map[string]json.RawMessage
}
@@ -80,9 +79,9 @@ func (srv *fakeAPISIXServiceSrv) ServeHTTP(w
http.ResponseWriter, r *http.Reques
w.WriteHeader(code)
}
- if r.Method == http.MethodPost {
- srv.id++
- key := fmt.Sprintf("/apisix/services/%d", srv.id)
+ if r.Method == http.MethodPut {
+ paths := strings.Split(r.URL.Path, "/")
+ key := fmt.Sprintf("/apisix/services/%s", paths[len(paths)-1])
data, _ := ioutil.ReadAll(r.Body)
srv.service[key] = data
w.WriteHeader(http.StatusCreated)
@@ -118,7 +117,6 @@ func (srv *fakeAPISIXServiceSrv) ServeHTTP(w
http.ResponseWriter, r *http.Reques
func runFakeServiceSrv(t *testing.T) *http.Server {
srv := &fakeAPISIXServiceSrv{
- id: 0,
service: make(map[string]json.RawMessage),
}
@@ -163,7 +161,9 @@ func TestServiceClient(t *testing.T) {
name := "test"
upsId := "13"
+ id := "1"
obj, err := cli.Create(context.TODO(), &v1.Service{
+ ID: &id,
FullName: &fullName,
Group: &group,
Name: &name,
@@ -172,7 +172,9 @@ func TestServiceClient(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, *obj.ID, "1")
+ id2 := "2"
obj, err = cli.Create(context.TODO(), &v1.Service{
+ ID: &id2,
FullName: &fullName,
Group: &group,
Name: &name,
diff --git a/pkg/apisix/ssl.go b/pkg/apisix/ssl.go
index 8d0ee7e..275933f 100644
--- a/pkg/apisix/ssl.go
+++ b/pkg/apisix/ssl.go
@@ -22,6 +22,8 @@ import (
"go.uber.org/zap"
+ "github.com/api7/ingress-controller/pkg/apisix/cache"
+ "github.com/api7/ingress-controller/pkg/id"
"github.com/api7/ingress-controller/pkg/log"
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
@@ -40,14 +42,64 @@ func newSSLClient(c *cluster) SSL {
}
}
-func (s *sslClient) Get(_ context.Context, fullname string) (*v1.Ssl, error) {
+func (s *sslClient) Get(ctx context.Context, fullname string) (*v1.Ssl, error)
{
log.Infow("try to look up ssl",
zap.String("fullname", fullname),
zap.String("url", s.url),
zap.String("cluster", s.clusterName),
)
- return s.cluster.cache.GetSSL(fullname)
+ ssl, err := s.cluster.cache.GetSSL(fullname)
+ if err == nil {
+ return ssl, nil
+ }
+ if err != cache.ErrNotFound {
+ log.Errorw("failed to find ssl in cache, will try to lookup
from APISIX",
+ zap.String("fullname", fullname),
+ zap.Error(err),
+ )
+ } else {
+ log.Warnw("failed to find ssl in cache, will try to lookup from
APISIX",
+ zap.String("fullname", fullname),
+ zap.Error(err),
+ )
+ }
+
+ // TODO Add mutex here to avoid dog-pile effection.
+ url := s.url + "/" + id.GenID(fullname)
+ resp, err := s.cluster.getResource(ctx, url)
+ if err != nil {
+ if err == cache.ErrNotFound {
+ log.Warnw("ssl not found",
+ zap.String("fullname", fullname),
+ zap.String("url", url),
+ zap.String("cluster", s.clusterName),
+ )
+ } else {
+ log.Errorw("failed to get ssl from APISIX",
+ zap.String("fullname", fullname),
+ zap.String("url", url),
+ zap.String("cluster", s.clusterName),
+ zap.Error(err),
+ )
+ }
+ return nil, err
+ }
+ ssl, err = resp.Item.ssl(s.clusterName)
+ if err != nil {
+ log.Errorw("failed to convert ssl item",
+ zap.String("url", s.url),
+ zap.String("ssl_key", resp.Item.Key),
+ zap.Error(err),
+ )
+ return nil, err
+ }
+
+ if err := s.cluster.cache.InsertSSL(ssl); err != nil {
+ log.Errorf("failed to reflect ssl create to cache: %s", err)
+ return nil, err
+ }
+ return ssl, nil
}
// List is only used in cache warming up. So here just pass through
@@ -99,8 +151,9 @@ func (s *sslClient) Create(ctx context.Context, obj *v1.Ssl)
(*v1.Ssl, error) {
if err != nil {
return nil, err
}
- log.Infow("creating ssl", zap.ByteString("body", data),
zap.String("url", s.url))
- resp, err := s.cluster.createResource(ctx, s.url, bytes.NewReader(data))
+ url := s.url + "/" + *obj.ID
+ log.Infow("creating ssl", zap.ByteString("body", data),
zap.String("url", url))
+ resp, err := s.cluster.createResource(ctx, url, bytes.NewReader(data))
if err != nil {
log.Errorf("failed to create ssl: %s", err)
return nil, err
diff --git a/pkg/apisix/ssl_test.go b/pkg/apisix/ssl_test.go
index 8ed9b1a..8e4ad2e 100644
--- a/pkg/apisix/ssl_test.go
+++ b/pkg/apisix/ssl_test.go
@@ -33,7 +33,6 @@ import (
)
type fakeAPISIXSSLSrv struct {
- id int
ssl map[string]json.RawMessage
}
@@ -80,9 +79,9 @@ func (srv *fakeAPISIXSSLSrv) ServeHTTP(w http.ResponseWriter,
r *http.Request) {
w.WriteHeader(code)
}
- if r.Method == http.MethodPost {
- srv.id++
- key := fmt.Sprintf("/apisix/ssl/%d", srv.id)
+ if r.Method == http.MethodPut {
+ paths := strings.Split(r.URL.Path, "/")
+ key := fmt.Sprintf("/apisix/ssl/%s", paths[len(paths)-1])
data, _ := ioutil.ReadAll(r.Body)
srv.ssl[key] = data
w.WriteHeader(http.StatusCreated)
@@ -118,7 +117,6 @@ func (srv *fakeAPISIXSSLSrv) ServeHTTP(w
http.ResponseWriter, r *http.Request) {
func runFakeSSLSrv(t *testing.T) *http.Server {
srv := &fakeAPISIXSSLSrv{
- id: 0,
ssl: make(map[string]json.RawMessage),
}
@@ -150,6 +148,7 @@ func TestSSLClient(t *testing.T) {
}
closedCh := make(chan struct{})
close(closedCh)
+
cli := newSSLClient(&cluster{
baseURL: u.String(),
cli: http.DefaultClient,
@@ -158,16 +157,20 @@ func TestSSLClient(t *testing.T) {
})
// Create
+ id1 := "1"
group := "default"
sni := "bar.com"
obj, err := cli.Create(context.TODO(), &v1.Ssl{
+ ID: &id1,
Group: &group,
Snis: []*string{&sni},
})
assert.Nil(t, err)
assert.Equal(t, *obj.ID, "1")
+ id2 := "2"
obj, err = cli.Create(context.TODO(), &v1.Ssl{
+ ID: &id2,
Group: &group,
Snis: []*string{&sni},
})
diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go
index 11d0b94..3f49b0b 100644
--- a/pkg/apisix/upstream.go
+++ b/pkg/apisix/upstream.go
@@ -23,6 +23,7 @@ import (
"go.uber.org/zap"
"github.com/api7/ingress-controller/pkg/apisix/cache"
+ "github.com/api7/ingress-controller/pkg/id"
"github.com/api7/ingress-controller/pkg/log"
v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
@@ -104,17 +105,42 @@ func (u *upstreamClient) Get(ctx context.Context,
fullname string) (*v1.Upstream
)
}
- // FIXME Replace the List with accurate get since list is not trivial.
- list, err := u.List(ctx)
+ // TODO Add mutex here to avoid dog-pile effection.
+ url := u.url + "/" + id.GenID(fullname)
+ resp, err := u.cluster.getResource(ctx, url)
if err != nil {
+ if err == cache.ErrNotFound {
+ log.Warnw("upstream not found",
+ zap.String("fullname", fullname),
+ zap.String("url", url),
+ zap.String("cluster", u.clusterName),
+ )
+ } else {
+ log.Errorw("failed to get upstream from APISIX",
+ zap.String("fullname", fullname),
+ zap.String("url", url),
+ zap.String("cluster", u.clusterName),
+ zap.Error(err),
+ )
+ }
return nil, err
}
- for _, elem := range list {
- if *elem.FullName == fullname {
- return elem, nil
- }
+
+ ups, err = resp.Item.upstream(u.clusterName)
+ if err != nil {
+ log.Errorw("failed to convert upstream item",
+ zap.String("url", u.url),
+ zap.String("ssl_key", resp.Item.Key),
+ zap.Error(err),
+ )
+ return nil, err
+ }
+
+ if err := u.cluster.cache.InsertUpstream(ups); err != nil {
+ log.Errorf("failed to reflect upstream create to cache: %s",
err)
+ return nil, err
}
- return nil, cache.ErrNotFound
+ return ups, nil
}
// List is only used in cache warming up. So here just pass through
@@ -177,9 +203,10 @@ func (u *upstreamClient) Create(ctx context.Context, obj
*v1.Upstream) (*v1.Upst
if err != nil {
return nil, err
}
- log.Infow("creating upstream", zap.ByteString("body", body),
zap.String("url", u.url))
+ url := u.url + "/" + *obj.ID
+ log.Infow("creating upstream", zap.ByteString("body", body),
zap.String("url", url))
- resp, err := u.cluster.createResource(ctx, u.url, bytes.NewReader(body))
+ resp, err := u.cluster.createResource(ctx, url, bytes.NewReader(body))
if err != nil {
log.Errorf("failed to create upstream: %s", err)
return nil, err
diff --git a/pkg/apisix/upstream_test.go b/pkg/apisix/upstream_test.go
index fef4901..5619680 100644
--- a/pkg/apisix/upstream_test.go
+++ b/pkg/apisix/upstream_test.go
@@ -34,7 +34,6 @@ import (
)
type fakeAPISIXUpstreamSrv struct {
- id int
upstream map[string]json.RawMessage
}
@@ -81,9 +80,9 @@ func (srv *fakeAPISIXUpstreamSrv) ServeHTTP(w
http.ResponseWriter, r *http.Reque
w.WriteHeader(code)
}
- if r.Method == http.MethodPost {
- srv.id++
- key := fmt.Sprintf("/apisix/upstreams/%d", srv.id)
+ if r.Method == http.MethodPut {
+ paths := strings.Split(r.URL.Path, "/")
+ key := fmt.Sprintf("/apisix/upstreams/%s", paths[len(paths)-1])
data, _ := ioutil.ReadAll(r.Body)
srv.upstream[key] = data
w.WriteHeader(http.StatusCreated)
@@ -119,7 +118,6 @@ func (srv *fakeAPISIXUpstreamSrv) ServeHTTP(w
http.ResponseWriter, r *http.Reque
func runFakeUpstreamSrv(t *testing.T) *http.Server {
srv := &fakeAPISIXUpstreamSrv{
- id: 0,
upstream: make(map[string]json.RawMessage),
}
@@ -175,7 +173,9 @@ func TestUpstreamClient(t *testing.T) {
},
}
+ id1 := "1"
obj, err := cli.Create(context.TODO(), &v1.Upstream{
+ ID: &id1,
FullName: &fullName,
Group: &group,
Name: &name,
@@ -186,7 +186,9 @@ func TestUpstreamClient(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, *obj.ID, "1")
+ id2 := "2"
obj, err = cli.Create(context.TODO(), &v1.Upstream{
+ ID: &id2,
FullName: &fullName,
Group: &group,
Name: &name,
diff --git a/pkg/seven/state/builder.go b/pkg/seven/state/builder.go
index 73fce0d..4861206 100644
--- a/pkg/seven/state/builder.go
+++ b/pkg/seven/state/builder.go
@@ -17,10 +17,10 @@ package state
import (
"context"
"errors"
- "strconv"
"sync"
"github.com/api7/ingress-controller/pkg/apisix/cache"
+ "github.com/api7/ingress-controller/pkg/id"
"github.com/api7/ingress-controller/pkg/log"
"github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/utils"
@@ -32,54 +32,40 @@ const (
WatchFromKind = "watch"
)
-//// InitDB insert object into memDB first time
-//func InitDB(){
-// routes, _ := apisix.ListRoute()
-// upstreams, _ := apisix.ListUpstream()
-// apisix.InsertRoute(routes)
-// apisix.InsertUpstreams(upstreams)
-//}
-//
-//// LoadTargetState load targetState from ... maybe k8s CRD
-//func LoadTargetState(routes []*v1.Route, upstreams []*v1.Upstream){
-//
-// // 1.diff
-// // 2.send event
-//}
-
-// paddingRoute padding route from memDB
-func paddingRoute(route *v1.Route, currentRoute *v1.Route) {
- // padding object, just id
+// paddingRoute fills route through currentRoute, it returns a boolean
+// value to indicate whether the route is a new created one.
+func paddingRoute(route *v1.Route, currentRoute *v1.Route) bool {
if currentRoute == nil {
- // NOT FOUND : set Id = 0
- id := strconv.Itoa(0)
- route.ID = &id
- } else {
- route.ID = currentRoute.ID
+ rid := id.GenID(*route.FullName)
+ route.ID = &rid
+ return true
}
+ route.ID = currentRoute.ID
+ return false
}
-// padding service from memDB
-func paddingService(service *v1.Service, currentService *v1.Service) {
+// paddingService fills service through currentService, it returns a boolean
+// value to indicate whether the service is a new created one.
+func paddingService(service *v1.Service, currentService *v1.Service) bool {
if currentService == nil {
- id := strconv.Itoa(0)
- service.ID = &id
- } else {
- service.ID = currentService.ID
+ sid := id.GenID(*service.FullName)
+ service.ID = &sid
+ return true
}
+ service.ID = currentService.ID
+ return false
}
-// paddingUpstream padding upstream from memDB
-func paddingUpstream(upstream *v1.Upstream, currentUpstream *v1.Upstream) {
- // padding id
+// paddingUpstream fills upstream through currentUpstream, it returns a boolean
+// value to indicate whether the upstream is a new created one.
+func paddingUpstream(upstream *v1.Upstream, currentUpstream *v1.Upstream) bool
{
if currentUpstream == nil {
- // NOT FOUND : set Id = 0
- id := strconv.Itoa(0)
- upstream.ID = &id
- } else {
- upstream.ID = currentUpstream.ID
+ uid := id.GenID(*upstream.FullName)
+ upstream.ID = &uid
+ return true
}
- // todo padding nodes ? or sync nodes from crd ?
+ upstream.ID = currentUpstream.ID
+ return false
}
// NewRouteWorkers make routeWrokers group by service per CRD
@@ -99,7 +85,10 @@ func NewRouteWorkers(ctx context.Context,
// 3.route get the Event and trigger a padding for object,then diff,sync;
func (r *routeWorker) trigger(event Event) {
- var errNotify error
+ var (
+ op string
+ errNotify error
+ )
defer func() {
if errNotify != nil {
r.ErrorChan <- CRDStatus{Id: "", Status: "failure",
Err: errNotify}
@@ -121,16 +110,20 @@ func (r *routeWorker) trigger(event Event) {
errNotify = err
return
}
- paddingRoute(r.Route, currentRoute)
- // diff
+
+ if paddingRoute(r.Route, currentRoute) {
+ op = Create
+ } else {
+ op = Update
+ }
+
hasDiff, err := utils.HasDiff(r.Route, currentRoute)
- // sync
if err != nil {
errNotify = err
return
}
if hasDiff {
- err := r.sync()
+ err := r.sync(op)
if err != nil {
errNotify = err
return
@@ -139,12 +132,12 @@ func (r *routeWorker) trigger(event Event) {
}
// sync
-func (r *routeWorker) sync() error {
+func (r *routeWorker) sync(op string) error {
var cluster string
if r.Group != nil {
cluster = *r.Group
}
- if *r.Route.ID != strconv.Itoa(0) {
+ if op == Update {
if _, err :=
conf.Client.Cluster(cluster).Route().Update(context.TODO(), r.Route); err !=
nil {
log.Errorf("failed to update route %s: %s, ", *r.Name,
err)
return err
@@ -202,14 +195,18 @@ func SolverSingleUpstream(u *v1.Upstream, swg
ServiceWorkerGroup, wg *sync.WaitG
errNotify = err
return
} else {
- paddingUpstream(u, currentUpstream)
- if currentUpstream == nil {
+ if paddingUpstream(u, currentUpstream) {
+ op = Create
+ } else {
+ op = Update
+ }
+
+ if op == Create {
if u.FromKind != nil && *u.FromKind == WatchFromKind {
// We don't have a pre-defined upstream and the
current upstream updating from
// endpoints.
return
}
- op = Create
if _, err :=
conf.Client.Cluster(cluster).Upstream().Create(context.TODO(), u); err != nil {
log.Errorf("failed to create upstream %s: %s",
*u.FullName, err)
return
diff --git a/pkg/seven/state/diff.go b/pkg/seven/state/diff.go
deleted file mode 100644
index 2de068d..0000000
--- a/pkg/seven/state/diff.go
+++ /dev/null
@@ -1,15 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package state
diff --git a/pkg/seven/state/service_worker.go
b/pkg/seven/state/service_worker.go
index 87b654f..9ad1abe 100644
--- a/pkg/seven/state/service_worker.go
+++ b/pkg/seven/state/service_worker.go
@@ -16,7 +16,6 @@ package state
import (
"context"
- "strconv"
"sync"
"github.com/api7/ingress-controller/pkg/log"
@@ -92,7 +91,9 @@ func SolverSingleService(svc *v1.Service, rwg
RouteWorkerGroup, wg *sync.WaitGro
cluster = *svc.Group
}
currentService, _ :=
conf.Client.Cluster(cluster).Service().Get(context.TODO(), *svc.FullName)
- paddingService(svc, currentService)
+ if paddingService(svc, currentService) {
+ op = Create
+ }
// diff
hasDiff, err := utils.HasDiff(svc, currentService)
// sync
@@ -101,18 +102,14 @@ func SolverSingleService(svc *v1.Service, rwg
RouteWorkerGroup, wg *sync.WaitGro
return
}
if hasDiff {
- if *svc.ID == strconv.Itoa(0) {
- op = Create
- if s, err :=
conf.Client.Cluster(cluster).Service().Create(context.TODO(), svc); err != nil {
+ if op == Create {
+ if _, err :=
conf.Client.Cluster(cluster).Service().Create(context.TODO(), svc); err != nil {
log.Errorf("failed to create service: %s", err)
errNotify = err
return
- } else {
- *svc.ID = *s.ID
}
log.Infof("create service %s, %s", *svc.Name,
*svc.UpstreamId)
} else {
- op = Update
needToUpdate := true
if currentService.FromKind != nil &&
*(currentService.FromKind) == ApisixService { // update from ApisixUpstream
if svc.FromKind == nil || (svc.FromKind != nil
&& *(svc.FromKind) != ApisixService) {
diff --git a/pkg/seven/state/solver.go b/pkg/seven/state/solver.go
index a278aaa..46b2167 100644
--- a/pkg/seven/state/solver.go
+++ b/pkg/seven/state/solver.go
@@ -70,7 +70,7 @@ func (s *ApisixCombination) Remove() error {
return err
}
}
- paddingService(svc, svcInCache)
+ _ = paddingService(svc, svcInCache)
err =
conf.Client.Cluster(cluster).Service().Delete(context.TODO(), svc)
if err != nil {
if err == cache.ErrNotFound {
@@ -98,7 +98,7 @@ func (s *ApisixCombination) Remove() error {
return err
}
}
- paddingUpstream(ups, upsInCache)
+ _ = paddingUpstream(ups, upsInCache)
err =
conf.Client.Cluster(cluster).Upstream().Delete(context.TODO(), ups)
if err == cache.ErrNotFound {
log.Errorf("failed to remove upstream %s: %s",
*ups.FullName, err)
@@ -203,7 +203,7 @@ func (rc *RouteCompare) Sync() error {
continue
}
- paddingRoute(old, route)
+ _ = paddingRoute(old, route)
if err :=
conf.Client.Cluster(cluster).Route().Delete(context.TODO(), old); err != nil {
log.Errorf("failed to delete route %s from
APISIX: %s", *old.Name, err)
merr = multierr.Append(merr, err)
diff --git a/pkg/seven/state/sync.go b/pkg/seven/state/sync.go
deleted file mode 100644
index 2de068d..0000000
--- a/pkg/seven/state/sync.go
+++ /dev/null
@@ -1,15 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package state
diff --git a/pkg/seven/state/upstream_worker.go
b/pkg/seven/state/upstream_worker.go
deleted file mode 100644
index 33818eb..0000000
--- a/pkg/seven/state/upstream_worker.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package state
-
-import (
- v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
-)
-
-type upstreamWorker struct {
- *v1.Upstream
- Event chan Event
- Quit chan Quit
-}