add matchtag and fix tests

This commit is contained in:
mgaitonde
2023-08-18 17:41:51 -07:00
parent 645b10548a
commit cdc61f713a
8 changed files with 56 additions and 29 deletions

View File

@@ -11,6 +11,9 @@ agent:
consul_query_interval: 5m consul_query_interval: 5m
# token to authenticate client if consul requires it # token to authenticate client if consul requires it
consul_token: 00000000-0000-0000-0000-000000000000 consul_token: 00000000-0000-0000-0000-000000000000
# this tag must be present in consul for Gocast to pickup the service
# by default its enable_gocast
consul_match_tag: enable_gocast
bgp: bgp:
local_as: 12345 local_as: 12345

View File

@@ -16,6 +16,7 @@ type AgentConfig struct {
ConsulAddr string `yaml:"consul_addr"` ConsulAddr string `yaml:"consul_addr"`
ConsulQueryInterval time.Duration `yaml:"consul_query_interval"` ConsulQueryInterval time.Duration `yaml:"consul_query_interval"`
ConsulToken string `yaml:"consul_token"` ConsulToken string `yaml:"consul_token"`
ConsulMatchTag string `yaml:"consul_match_tag"`
} }
type BgpConfig struct { type BgpConfig struct {

View File

@@ -17,7 +17,7 @@ const (
consulNodeEnv = "CONSUL_NODE" consulNodeEnv = "CONSUL_NODE"
consulToken = "CONSUL_TOKEN" consulToken = "CONSUL_TOKEN"
allowStale = "CONSUL_STALE" allowStale = "CONSUL_STALE"
matchTag = "enable_gocast" defaultmatchTag = "enable_gocast"
nodeURL = "/catalog/node" nodeURL = "/catalog/node"
remoteHealthCheckurl = "/health/checks" remoteHealthCheckurl = "/health/checks"
localHealthCheckurl = "/agent/checks" localHealthCheckurl = "/agent/checks"
@@ -35,6 +35,7 @@ type ConsulMon struct {
addr string addr string
token string token string
node string node string
matchTag string
client Clienter client Clienter
} }
@@ -55,12 +56,15 @@ func contains(inp []string, elem string) bool {
return false return false
} }
func NewConsulMon(addr string, token string) (*ConsulMon, error) { func NewConsulMon(addr string, token string, matchTag string) (*ConsulMon, error) {
node := os.Getenv(consulNodeEnv) node := os.Getenv(consulNodeEnv)
if node == "" { if node == "" {
return nil, fmt.Errorf("%s env variable not set", consulNodeEnv) return nil, fmt.Errorf("%s env variable not set", consulNodeEnv)
} }
return &ConsulMon{addr: addr, token: token, node: node, client: &http.Client{Timeout: 10 * time.Second}}, nil if matchTag == "" {
matchTag = defaultmatchTag
}
return &ConsulMon{addr: addr, token: token, node: node, client: &http.Client{Timeout: 10 * time.Second}, matchTag: matchTag}, nil
} }
func getHTTPReq(httpMethod string, addr string, tokenFrmCfg string) (*http.Request, error) { func getHTTPReq(httpMethod string, addr string, tokenFrmCfg string) (*http.Request, error) {
@@ -95,10 +99,10 @@ func (c *ConsulMon) queryServices() ([]*App, error) {
defer resp.Body.Close() defer resp.Body.Close()
var consulData ConsulServiceData var consulData ConsulServiceData
if err := json.NewDecoder(resp.Body).Decode(&consulData); err != nil { if err := json.NewDecoder(resp.Body).Decode(&consulData); err != nil {
return apps, fmt.Errorf("Unable to decode consul data: %v", err) return apps, fmt.Errorf("unable to decode consul data: %v", err)
} }
for _, service := range consulData.Services { for _, service := range consulData.Services {
if !contains(service.Tags, matchTag) { if !contains(service.Tags, c.matchTag) {
continue continue
} }
var ( var (
@@ -170,7 +174,7 @@ func (c *ConsulMon) healthCheckLocal(service string) (bool, error) {
return false, nil return false, nil
} }
} }
return false, fmt.Errorf("No local healthcheck info found for service %s on node %s in consul", service, c.node) return false, fmt.Errorf("no local healthcheck info found for service %s on node %s in consul", service, c.node)
} }
// healthCheckRemote queries the consul cluster's healthcheck endpoint to perform service healthchecks // healthCheckRemote queries the consul cluster's healthcheck endpoint to perform service healthchecks
@@ -202,7 +206,7 @@ func (c *ConsulMon) healthCheckRemote(service string) (bool, error) {
return false, nil return false, nil
} }
} }
return false, fmt.Errorf("No healthcheck info found for node %s in consul", c.node) return false, fmt.Errorf("no healthcheck info found for node %s in consul", c.node)
} }
// healthCheck determines if we should use the local agent // healthCheck determines if we should use the local agent

View File

@@ -2,7 +2,7 @@ package controller
import ( import (
"bytes" "bytes"
"io/ioutil" "io"
"net/http" "net/http"
"os" "os"
"testing" "testing"
@@ -119,13 +119,13 @@ func TestQueryServices(t *testing.T) {
a := assert.New(t) a := assert.New(t)
client := &MockClient{} client := &MockClient{}
cm := &ConsulMon{ cm := &ConsulMon{
addr: "foo", node: "test", client: client, addr: "foo", node: "test", client: client, matchTag: "enable_gocast",
} }
// test valid app // test valid app
client.do = func(*http.Request) (*http.Response, error) { client.do = func(*http.Request) (*http.Response, error) {
b := bytes.NewBuffer([]byte(mockConsulData["single-app"])) b := bytes.NewBuffer([]byte(mockConsulData["single-app"]))
return &http.Response{Body: ioutil.NopCloser(b), StatusCode: http.StatusOK}, nil return &http.Response{Body: io.NopCloser(b), StatusCode: http.StatusOK}, nil
} }
apps, err := cm.queryServices() apps, err := cm.queryServices()
if err != nil { if err != nil {
@@ -140,7 +140,7 @@ func TestQueryServices(t *testing.T) {
// test no match // test no match
client.do = func(*http.Request) (*http.Response, error) { client.do = func(*http.Request) (*http.Response, error) {
b := bytes.NewBuffer([]byte(mockConsulData["single-app-no-match"])) b := bytes.NewBuffer([]byte(mockConsulData["single-app-no-match"]))
return &http.Response{Body: ioutil.NopCloser(b), StatusCode: http.StatusOK}, nil return &http.Response{Body: io.NopCloser(b), StatusCode: http.StatusOK}, nil
} }
apps, err = cm.queryServices() apps, err = cm.queryServices()
if err != nil { if err != nil {
@@ -151,7 +151,7 @@ func TestQueryServices(t *testing.T) {
// test missing vip // test missing vip
client.do = func(*http.Request) (*http.Response, error) { client.do = func(*http.Request) (*http.Response, error) {
b := bytes.NewBuffer([]byte(mockConsulData["single-app-no-vip"])) b := bytes.NewBuffer([]byte(mockConsulData["single-app-no-vip"]))
return &http.Response{Body: ioutil.NopCloser(b), StatusCode: http.StatusOK}, nil return &http.Response{Body: io.NopCloser(b), StatusCode: http.StatusOK}, nil
} }
apps, _ = cm.queryServices() apps, _ = cm.queryServices()
a.Equal(0, len(apps)) a.Equal(0, len(apps))
@@ -160,13 +160,13 @@ func TestQueryServices(t *testing.T) {
func TestHealthCheck(t *testing.T) { func TestHealthCheck(t *testing.T) {
a := assert.New(t) a := assert.New(t)
client := &MockClient{} client := &MockClient{}
cm := &ConsulMon{node: "test-node1", client: client} cm := &ConsulMon{node: "test-node1", client: client, matchTag: "enable_gocast"}
// test remote checks // test remote checks
cm.addr = "http://remote/check" cm.addr = "http://remote/check"
client.do = func(*http.Request) (*http.Response, error) { client.do = func(*http.Request) (*http.Response, error) {
b := bytes.NewBuffer([]byte(mockConsulCheckData["remote-pass"])) b := bytes.NewBuffer([]byte(mockConsulCheckData["remote-pass"]))
return &http.Response{Body: ioutil.NopCloser(b), StatusCode: http.StatusOK}, nil return &http.Response{Body: io.NopCloser(b), StatusCode: http.StatusOK}, nil
} }
check, err := cm.healthCheck("test-service") check, err := cm.healthCheck("test-service")
if err != nil { if err != nil {
@@ -175,7 +175,7 @@ func TestHealthCheck(t *testing.T) {
a.True(check) a.True(check)
client.do = func(*http.Request) (*http.Response, error) { client.do = func(*http.Request) (*http.Response, error) {
b := bytes.NewBuffer([]byte(mockConsulCheckData["remote-fail"])) b := bytes.NewBuffer([]byte(mockConsulCheckData["remote-fail"]))
return &http.Response{Body: ioutil.NopCloser(b), StatusCode: http.StatusOK}, nil return &http.Response{Body: io.NopCloser(b), StatusCode: http.StatusOK}, nil
} }
check, _ = cm.healthCheck("test-service") check, _ = cm.healthCheck("test-service")
a.False(check) a.False(check)
@@ -184,7 +184,7 @@ func TestHealthCheck(t *testing.T) {
cm.addr = "http://localhost/check" cm.addr = "http://localhost/check"
client.do = func(*http.Request) (*http.Response, error) { client.do = func(*http.Request) (*http.Response, error) {
b := bytes.NewBuffer([]byte(mockConsulCheckData["local-pass"])) b := bytes.NewBuffer([]byte(mockConsulCheckData["local-pass"]))
return &http.Response{Body: ioutil.NopCloser(b), StatusCode: http.StatusOK}, nil return &http.Response{Body: io.NopCloser(b), StatusCode: http.StatusOK}, nil
} }
check, _ = cm.healthCheck("test-service") check, _ = cm.healthCheck("test-service")
if err != nil { if err != nil {
@@ -194,7 +194,7 @@ func TestHealthCheck(t *testing.T) {
cm.addr = "http://127.0.0.1/check" cm.addr = "http://127.0.0.1/check"
client.do = func(*http.Request) (*http.Response, error) { client.do = func(*http.Request) (*http.Response, error) {
b := bytes.NewBuffer([]byte(mockConsulCheckData["local-fail"])) b := bytes.NewBuffer([]byte(mockConsulCheckData["local-fail"]))
return &http.Response{Body: ioutil.NopCloser(b), StatusCode: http.StatusOK}, nil return &http.Response{Body: io.NopCloser(b), StatusCode: http.StatusOK}, nil
} }
check, _ = cm.healthCheck("test-service") check, _ = cm.healthCheck("test-service")
a.False(check) a.False(check)

View File

@@ -84,7 +84,7 @@ func NewMonitor(config *c.Config) *MonitorMgr {
cleanups: make(map[string]chan bool), cleanups: make(map[string]chan bool),
} }
if config.Agent.ConsulAddr != "" { if config.Agent.ConsulAddr != "" {
cmon, err := NewConsulMon(config.Agent.ConsulAddr, config.Agent.ConsulToken) cmon, err := NewConsulMon(config.Agent.ConsulAddr, config.Agent.ConsulToken, config.Agent.ConsulMatchTag)
if err != nil { if err != nil {
glog.Errorf("Failed to start consul monitor: %v", err) glog.Errorf("Failed to start consul monitor: %v", err)
} else { } else {
@@ -184,7 +184,7 @@ func (m *MonitorMgr) Add(app *App) {
} }
// Remove removes an app from monitor manager, stops BGP // Remove removes an app from monitor manager, stops BGP
/// announcement and cleans up state // / announcement and cleans up state
func (m *MonitorMgr) Remove(appName string) { func (m *MonitorMgr) Remove(appName string) {
m.monMu.Lock() m.monMu.Lock()
defer m.monMu.Unlock() defer m.monMu.Unlock()

View File

@@ -27,7 +27,7 @@ func gateway(family int) (net.IP, error) {
cmdList := getCmdList(cmd) cmdList := getCmdList(cmd)
out, err := exec.Command(execCmd, cmdList...).Output() out, err := exec.Command(execCmd, cmdList...).Output()
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to execute command: %s: %v", cmd, err) return nil, fmt.Errorf("failed to execute command: %s: %v", cmd, err)
} }
return net.ParseIP(strings.TrimSpace(string(out))), nil return net.ParseIP(strings.TrimSpace(string(out))), nil
} }
@@ -41,7 +41,7 @@ func via(dest net.IP) (net.IP, error) {
cmdList := getCmdList(cmd) cmdList := getCmdList(cmd)
out, err := exec.Command(execCmd, cmdList...).Output() out, err := exec.Command(execCmd, cmdList...).Output()
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to execute command: %s: %v", cmd, err) return nil, fmt.Errorf("failed to execute command: %s: %v", cmd, err)
} }
if string(out) == "" { if string(out) == "" {
// assume the provided dest is the next hop // assume the provided dest is the next hop
@@ -63,7 +63,7 @@ func localAddress(gw net.IP) (net.IP, error) {
} }
} }
} }
return nil, fmt.Errorf("Unable to find local address") return nil, fmt.Errorf("unable to find local address")
} }
func addLoopback(name string, addr *net.IPNet) error { func addLoopback(name string, addr *net.IPNet) error {
@@ -82,7 +82,7 @@ func addLoopback(name string, addr *net.IPNet) error {
cmdList := getCmdList(cmd) cmdList := getCmdList(cmd)
_, err := exec.Command(execCmd, cmdList...).Output() _, err := exec.Command(execCmd, cmdList...).Output()
if err != nil { if err != nil {
return fmt.Errorf("Failed to Add loopback command: %s: %v", cmd, err) return fmt.Errorf("failed to Add loopback command: %s: %v", cmd, err)
} }
return nil return nil
} }
@@ -97,7 +97,7 @@ func deleteLoopback(addr *net.IPNet) error {
cmdList := getCmdList(cmd) cmdList := getCmdList(cmd)
_, err := exec.Command(execCmd, cmdList...).Output() _, err := exec.Command(execCmd, cmdList...).Output()
if err != nil { if err != nil {
return fmt.Errorf("Failed to delete loopback command: %s: %v", cmd, err) return fmt.Errorf("failed to delete loopback command: %s: %v", cmd, err)
} }
return nil return nil
} }

View File

@@ -12,9 +12,14 @@ import (
func TestGateway(t *testing.T) { func TestGateway(t *testing.T) {
execCmd = os.Args[0] execCmd = os.Args[0]
os.Setenv("test_name", "test_gateway") os.Setenv("test_name", "test_gateway")
gw, err := gateway() gw, err := gateway(4)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "10.1.1.1", gw.String()) assert.Equal(t, "10.1.1.1", gw.String())
os.Setenv("test_name", "test_gateway_v6")
gw, err = gateway(6)
assert.Nil(t, err)
assert.Equal(t, "2001:dead:beef::1", gw.String())
} }
func TestVia(t *testing.T) { func TestVia(t *testing.T) {
@@ -28,6 +33,11 @@ func TestVia(t *testing.T) {
ip, err = via(net.ParseIP("10.1.4.1")) ip, err = via(net.ParseIP("10.1.4.1"))
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "10.1.4.1", ip.String()) assert.Equal(t, "10.1.4.1", ip.String())
os.Setenv("test_name", "test_via_v6")
ip, err = via(net.ParseIP("2001:dead:beef::100"))
assert.Nil(t, err)
assert.Equal(t, "2001:dead:beef::1", ip.String())
} }
func TestAddLoopback(t *testing.T) { func TestAddLoopback(t *testing.T) {
@@ -41,14 +51,23 @@ func TestAddLoopback(t *testing.T) {
_, ipnet, _ = net.ParseCIDR("1.1.1.1/32") _, ipnet, _ = net.ParseCIDR("1.1.1.1/32")
err = addLoopback("test_app", ipnet) err = addLoopback("test_app", ipnet)
assert.NotNil(t, err) assert.NotNil(t, err)
os.Setenv("test_name", "test_add_v6")
_, ipnet, _ = net.ParseCIDR("2001:dead:beef:1001::100/64")
err = addLoopback("test_app", ipnet)
assert.Nil(t, err)
} }
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
switch os.Getenv("test_name") { switch os.Getenv("test_name") {
case "test_gateway": case "test_gateway":
fmt.Println("10.1.1.1") fmt.Println("10.1.1.1")
case "test_gateway_v6":
fmt.Println("2001:dead:beef::1")
case "test_via": case "test_via":
fmt.Println("10.1.2.1") fmt.Println("10.1.2.1")
case "test_via_v6":
fmt.Println("2001:dead:beef::1")
case "test_via_none": case "test_via_none":
break break
case "test_add_fail": case "test_add_fail":

BIN
gocast

Binary file not shown.