176 lines
4.7 KiB
Go
176 lines
4.7 KiB
Go
package controller
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/golang/glog"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
consulNodeEnv = "CONSUL_NODE"
|
|
allowStale = "CONSUL_STALE"
|
|
matchTag = "enable_gocast"
|
|
nodeURL = "/catalog/node"
|
|
remoteHealthCheckurl = "/health/checks"
|
|
localHealthCheckurl = "/agent/checks"
|
|
)
|
|
|
|
type ConsulMon struct {
|
|
addr string
|
|
node string
|
|
client *http.Client
|
|
}
|
|
|
|
type ConsulServiceData struct {
|
|
Services map[string]struct {
|
|
ID string
|
|
Service string
|
|
Tags []string
|
|
}
|
|
}
|
|
|
|
func contains(inp []string, elem string) bool {
|
|
for _, a := range inp {
|
|
if a == elem {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func NewConsulMon(addr string) (*ConsulMon, error) {
|
|
node := os.Getenv(consulNodeEnv)
|
|
if node == "" {
|
|
return nil, fmt.Errorf("%s env variable not set", consulNodeEnv)
|
|
}
|
|
return &ConsulMon{addr: addr, node: node, client: &http.Client{Timeout: 10 * time.Second}}, nil
|
|
}
|
|
|
|
func (c *ConsulMon) queryServices() ([]*App, error) {
|
|
var apps []*App
|
|
stale := ""
|
|
if os.Getenv(allowStale) {
|
|
stale = "?stale"
|
|
}
|
|
addr := c.addr + fmt.Sprintf("%s/%s%s", nodeURL, c.node, stale)
|
|
resp, err := c.client.Get(addr)
|
|
if err != nil {
|
|
return apps, err
|
|
}
|
|
defer resp.Body.Close()
|
|
var consulData ConsulServiceData
|
|
if err := json.NewDecoder(resp.Body).Decode(&consulData); err != nil {
|
|
return apps, fmt.Errorf("Unable to decode consul data: %v", err)
|
|
}
|
|
for _, service := range consulData.Services {
|
|
if !contains(service.Tags, matchTag) {
|
|
continue
|
|
}
|
|
var (
|
|
vip string
|
|
monitors []string
|
|
nats []string
|
|
)
|
|
for _, tag := range service.Tags {
|
|
// try to find the requires tags. Only vip is mandatory
|
|
parts := strings.Split(tag, "=")
|
|
if len(parts) != 2 {
|
|
continue
|
|
}
|
|
switch parts[0] {
|
|
case "gocast_vip":
|
|
vip = parts[1]
|
|
case "gocast_monitor":
|
|
monitors = append(monitors, parts[1])
|
|
case "gocast_nat":
|
|
nats = append(nats, parts[1])
|
|
}
|
|
}
|
|
if vip == "" {
|
|
glog.Errorf("No vip Tag found in matched service :%s", service.Service)
|
|
continue
|
|
}
|
|
app, err := NewApp(service.Service, vip, monitors, nats, "consul")
|
|
if err != nil {
|
|
glog.Errorf("Unable to add consul app: %v", err)
|
|
continue
|
|
}
|
|
apps = append(apps, app)
|
|
|
|
}
|
|
return apps, nil
|
|
}
|
|
|
|
// healthCheckLocal queries a node's local consul agent to perform service healthchecks
|
|
// This is the underlying api call: https://www.consul.io/api/agent/check.html
|
|
func (c *ConsulMon) healthCheckLocal(service string) (bool, error) {
|
|
params := url.Values{}
|
|
params.Add("filter", "enable_gocast in ServiceTags")
|
|
addr := c.addr + fmt.Sprintf("%s?%s", localHealthCheckurl, params.Encode())
|
|
resp, err := c.client.Get(addr)
|
|
if err != nil {
|
|
glog.V(2).Infof("Error getting %s with %s", addr, err)
|
|
return false, err
|
|
}
|
|
defer resp.Body.Close()
|
|
var services map[string]interface{}
|
|
if err := json.NewDecoder(resp.Body).Decode(&services); err != nil {
|
|
return false, err
|
|
}
|
|
for _, sInfo := range services {
|
|
serviceInfo := sInfo.(map[string]interface{})
|
|
if serviceInfo["ServiceName"].(string) == service {
|
|
status := serviceInfo["Status"].(string)
|
|
if status == "passing" {
|
|
return true, nil
|
|
}
|
|
glog.V(2).Infof("Consul local healthcheck returned %s status", status)
|
|
return false, nil
|
|
}
|
|
}
|
|
return false, fmt.Errorf("No local healcheck info found for service %s on node %s in consul", service, c.node)
|
|
}
|
|
|
|
// healthCheckRemote queries the consul cluster's healthcheck endpoint to perform service healthchecks
|
|
// This is the underlying api call: https://www.consul.io/api/health.html
|
|
func (c *ConsulMon) healthCheckRemote(service string) (bool, error) {
|
|
addr := c.addr + fmt.Sprintf("%s/%s", remoteHealthCheckurl, service)
|
|
resp, err := c.client.Get(addr)
|
|
if err != nil {
|
|
glog.V(2).Infof("Error getting %s with %s", addr, err)
|
|
return false, err
|
|
}
|
|
defer resp.Body.Close()
|
|
var data []interface{}
|
|
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, nodeInfo := range data {
|
|
n := nodeInfo.(map[string]interface{})
|
|
if n["Node"].(string) == c.node {
|
|
if n["Status"].(string) == "passing" {
|
|
return true, nil
|
|
}
|
|
glog.V(2).Infof("Consul healthcheck returned %s status", n["Status"].(string))
|
|
return false, nil
|
|
}
|
|
}
|
|
return false, fmt.Errorf("No healcheck info found for node %s in consul", c.node)
|
|
}
|
|
|
|
// healthCheck determines if we should use the local agent
|
|
// If the address contains "localhost", then it presumes that the local agent is to be used.
|
|
func (c *ConsulMon) healthCheck(service string) (bool, error) {
|
|
usingLocalAgent := strings.Contains(c.addr, "localhost")
|
|
if usingLocalAgent {
|
|
return c.healthCheckLocal(service)
|
|
}
|
|
return c.healthCheckRemote(service)
|
|
}
|