ADd consul support , multiple monitors, config file
This commit is contained in:
@@ -3,6 +3,7 @@ package controller
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/golang/glog"
|
||||
c "github.com/mayuresh82/gocast/config"
|
||||
api "github.com/osrg/gobgp/api"
|
||||
"net"
|
||||
"os/exec"
|
||||
@@ -10,6 +11,11 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMonitorInterval = 10 * time.Second
|
||||
defaultCleanupTimer = 15 * time.Minute
|
||||
)
|
||||
|
||||
func portMonitor(protocol, port string) bool {
|
||||
switch protocol {
|
||||
case "tcp":
|
||||
@@ -18,14 +24,14 @@ func portMonitor(protocol, port string) bool {
|
||||
glog.V(4).Infof("Monitor tcp port up")
|
||||
return true
|
||||
}
|
||||
defer conn.Close()
|
||||
conn.Close()
|
||||
case "udp":
|
||||
conn, err := net.ListenPacket(protocol, ":"+port)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Monitor udp port up")
|
||||
return true
|
||||
}
|
||||
defer conn.Close()
|
||||
conn.Close()
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -53,74 +59,154 @@ type appMon struct {
|
||||
}
|
||||
|
||||
type MonitorMgr struct {
|
||||
monitors map[string]*appMon
|
||||
cleanups map[string]chan bool
|
||||
c *Controller
|
||||
monitorInterval time.Duration
|
||||
cleanupTimer time.Duration
|
||||
monitors map[string]*appMon
|
||||
cleanups map[string]chan bool
|
||||
config *c.Config
|
||||
ctrl *Controller
|
||||
consul *ConsulMon
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewMonitor(localAS, peerAS int, monitorInterval time.Duration, peerIP string, cleanup time.Duration) *MonitorMgr {
|
||||
c, err := NewController(localAS, peerAS, peerIP)
|
||||
func NewMonitor(config *c.Config) *MonitorMgr {
|
||||
ctrl, err := NewController(config)
|
||||
if err != nil {
|
||||
glog.Exitf("Failed to start BGP controller: %v", err)
|
||||
}
|
||||
return &MonitorMgr{
|
||||
c: c,
|
||||
monitors: make(map[string]*appMon),
|
||||
cleanups: make(map[string]chan bool),
|
||||
monitorInterval: monitorInterval,
|
||||
cleanupTimer: cleanup,
|
||||
mon := &MonitorMgr{
|
||||
ctrl: ctrl,
|
||||
monitors: make(map[string]*appMon),
|
||||
cleanups: make(map[string]chan bool),
|
||||
}
|
||||
if config.Agent.ConsulAddr != "" {
|
||||
cmon, err := NewConsulMon(config.Agent.ConsulAddr)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start consul monitor: %v", err)
|
||||
} else {
|
||||
mon.consul = cmon
|
||||
go mon.consulMon()
|
||||
}
|
||||
}
|
||||
if config.Agent.MonitorInterval == 0 {
|
||||
config.Agent.MonitorInterval = defaultMonitorInterval
|
||||
}
|
||||
if config.Agent.CleanupTimer == 0 {
|
||||
config.Agent.CleanupTimer = defaultCleanupTimer
|
||||
}
|
||||
mon.config = config
|
||||
return mon
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) consulMon() {
|
||||
for {
|
||||
apps, err := m.consul.queryServices()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to query consul: %v", err)
|
||||
} else {
|
||||
for _, app := range apps {
|
||||
m.Add(app)
|
||||
}
|
||||
// remove currently running apps that are not discovered in this pass
|
||||
var toRemove []string
|
||||
m.Lock()
|
||||
for name := range m.monitors {
|
||||
var found bool
|
||||
for _, app := range apps {
|
||||
if name == app.Name {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
glog.V(2).Infof("Removing app: %s as it was not found in consul", name)
|
||||
toRemove = append(toRemove, name)
|
||||
}
|
||||
}
|
||||
for _, tr := range toRemove {
|
||||
m.Remove(tr)
|
||||
}
|
||||
m.Unlock()
|
||||
}
|
||||
<-time.After(m.config.Agent.ConsulQueryInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) Add(app *App) {
|
||||
// stop and start a new one if one already running
|
||||
// check if already running
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
for _, appMon := range m.monitors {
|
||||
if appMon.app.Equal(app) && appMon.checkOn {
|
||||
return
|
||||
}
|
||||
if appMon.app.Vip.String() == app.Vip.String() && appMon.app.Name != app.Name {
|
||||
glog.Errorf("Error: Vip %s is already being announced by app: %s", app.Vip.String(), appMon.app.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
m.Remove(app.Name)
|
||||
appMon := &appMon{app: app, done: make(chan bool)}
|
||||
m.Lock()
|
||||
m.monitors[app.Name] = appMon
|
||||
m.Unlock()
|
||||
go m.runLoop(appMon)
|
||||
glog.Infof("Registered a new app: %v", app)
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) Remove(appName string) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
if a, ok := m.monitors[appName]; ok {
|
||||
if a.checkOn {
|
||||
a.done <- true
|
||||
}
|
||||
if a.announced {
|
||||
if err := m.c.Withdraw(a.app.Vip); err != nil {
|
||||
if err := m.ctrl.Withdraw(a.app.Vip); err != nil {
|
||||
glog.Errorf("Failed to withdraw route: %v", err)
|
||||
}
|
||||
}
|
||||
deleteLoopback(appName)
|
||||
deleteLoopback(a.app.Vip)
|
||||
if ok, mon := a.app.needsNatRule(); ok {
|
||||
natRule("D", a.app.Vip.IP, m.ctrl.localIP, mon.Port, mon.Protocol)
|
||||
}
|
||||
}
|
||||
delete(m.monitors, appName)
|
||||
}
|
||||
func (m *MonitorMgr) runMonitors(app *App) bool {
|
||||
var check bool
|
||||
for _, mon := range app.Monitors {
|
||||
switch mon.Type {
|
||||
case Monitor_PORT:
|
||||
check = portMonitor(mon.Protocol, mon.Port)
|
||||
case Monitor_EXEC:
|
||||
check = execMonitor(mon.Cmd)
|
||||
case Monitor_CONSUL:
|
||||
c, err := m.consul.healthCheck(app.Name)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to perform consul healthcheck for %s: %v", app.Name, err)
|
||||
}
|
||||
check = c
|
||||
}
|
||||
if !check {
|
||||
glog.V(2).Infof("%s Monitor for app: %s Failed", mon.Type.String(), app.Name)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) checkCond(am *appMon) error {
|
||||
var cond bool
|
||||
app := am.app
|
||||
switch app.Monitor.Type {
|
||||
case Monitor_PORT:
|
||||
cond = portMonitor(app.Monitor.Protocol, app.Monitor.Port)
|
||||
case Monitor_EXEC:
|
||||
cond = execMonitor(app.Monitor.Cmd)
|
||||
}
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
if cond {
|
||||
glog.V(2).Infof("%s Monitor for app: %s succeeded", app.Monitor.Type.String(), app.Name)
|
||||
if m.runMonitors(app) {
|
||||
glog.V(2).Infof("All Monitors for app: %s succeeded", app.Name)
|
||||
if !am.announced {
|
||||
if err := addLoopback(app.Name, app.Vip); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.c.Announce(app.Vip); err != nil {
|
||||
if ok, mon := app.needsNatRule(); ok {
|
||||
if err := natRule("A", app.Vip.IP, m.ctrl.localIP, mon.Port, mon.Protocol); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := m.ctrl.Announce(app.Vip); err != nil {
|
||||
return fmt.Errorf("Failed to announce route: %v", err)
|
||||
}
|
||||
am.announced = true
|
||||
@@ -129,9 +215,8 @@ func (m *MonitorMgr) checkCond(am *appMon) error {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
glog.V(2).Infof("%s Monitor for app: %s Failed", app.Monitor.Type.String(), app.Name)
|
||||
if am.announced {
|
||||
if err := m.c.Withdraw(app.Vip); err != nil {
|
||||
if err := m.ctrl.Withdraw(app.Vip); err != nil {
|
||||
return fmt.Errorf("Failed to withdraw route: %v", err)
|
||||
}
|
||||
am.announced = false
|
||||
@@ -148,7 +233,7 @@ func (m *MonitorMgr) runLoop(am *appMon) {
|
||||
if err := m.checkCond(am); err != nil {
|
||||
glog.Errorln(err)
|
||||
}
|
||||
t := time.NewTicker(m.monitorInterval)
|
||||
t := time.NewTicker(m.config.Agent.MonitorInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
@@ -165,25 +250,30 @@ func (m *MonitorMgr) runLoop(am *appMon) {
|
||||
|
||||
func (m *MonitorMgr) CloseAll() {
|
||||
glog.Infof("Shutting down all open bgp sessions")
|
||||
if err := m.c.Shutdown(); err != nil {
|
||||
if err := m.ctrl.Shutdown(); err != nil {
|
||||
glog.Errorf("Failed to shut-down BGP: %v", err)
|
||||
}
|
||||
for name, am := range m.monitors {
|
||||
for _, am := range m.monitors {
|
||||
if am.checkOn {
|
||||
am.done <- true
|
||||
}
|
||||
deleteLoopback(name)
|
||||
deleteLoopback(am.app.Vip)
|
||||
if ok, mon := am.app.needsNatRule(); ok {
|
||||
natRule("D", am.app.Vip.IP, m.ctrl.localIP, mon.Port, mon.Protocol)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) Cleanup(app string, exit chan bool) {
|
||||
t := time.NewTimer(m.cleanupTimer)
|
||||
t := time.NewTimer(m.config.Agent.CleanupTimer)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
glog.Infof("Cleaning up app %s", app)
|
||||
m.Lock()
|
||||
m.Remove(app)
|
||||
m.Unlock()
|
||||
case <-exit:
|
||||
return
|
||||
}
|
||||
@@ -191,5 +281,5 @@ func (m *MonitorMgr) Cleanup(app string, exit chan bool) {
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) GetInfo() (*api.Peer, error) {
|
||||
return m.c.PeerInfo()
|
||||
return m.ctrl.PeerInfo()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user