Files
gocast/controller/monitor.go
Ben Roberts 567a84095e Implement support for multiple BGP peers
The BGP controller now supports announcing routes to multiple BGP peers for redundancy and resilience. If one peer fails, route announcements continue to succeed for other healthy peers.

```yaml
bgp:
  local_as: 12345
  local_ip: 192.168.1.100  # optional
  peers:
    - peer_ip: 10.10.10.1
      peer_as: 6789
      communities:        # per-peer communities (optional)
        - 100:100
    - peer_ip: 10.10.10.2
      peer_as: 6789
      communities:
        - 100:101
      multi_hop: true     # optional, defaults to true for eBGP
  communities:            # global communities applied to all peers
    - 1000:1000
  origin: igp
```

```yaml
bgp:
  local_as: 12345
  peer_as: 6789
  peer_ip: 10.10.10.1
  communities:
    - 100:100
  origin: igp
```

Legacy configurations are automatically converted to the new format internally, ensuring backward compatibility.

Routes are announced to all configured peers. If announcement to one peer fails, the operation continues for other peers. Errors are aggregated and returned, but partial success is allowed.

Communities are merged in the following order:
1. **Global communities** (defined at `bgp.communities`)
2. **Per-peer communities** (defined at `bgp.peers[].communities`)
3. **Per-route communities** (defined at `apps[].vip_config.bgp_communities`)

Example: If global communities are `[1000:1000]`, peer communities are `[100:100]`, and route communities are `[5000:5000]`, the announced route will have all three: `[1000:1000, 100:100, 5000:5000]`.

- **Default behavior**: Multi-hop is disabled by default
- **Enable**: Set `multi_hop: true` per peer to explicitly enable multi-hop BGP

The `/info` endpoint now returns an array of peer information instead of a single peer object:

**Before:**
```json
{
  "conf": {
    "neighbor_address": "10.10.10.1",
    "peer_as": 6789
  },
  "state": {...}
}
```

**After:**
```json
[
  {
    "conf": {
      "neighbor_address": "10.10.10.1",
      "peer_as": 6789
    },
    "state": {...}
  },
  {
    "conf": {
      "neighbor_address": "10.10.10.2",
      "peer_as": 6789
    },
    "state": {...}
  }
]
```

- `config/config.go`: Added `PeerConfig` struct and `Peers` slice to `BgpConfig`
- `controller/bgp.go`: Refactored to support multiple peers with best-effort semantics
- `controller/monitor.go`: Updated `GetInfo()` to return slice of peers
- `server/server.go`: Updated info handler to return array of peers

1. **Controller struct** now stores `[]PeerConfig` instead of single peer fields
2. **Announce/Withdraw** methods loop through all peers with error aggregation
3. **getApiPath** accepts a `PeerConfig` parameter for per-peer community merging
4. **addPeer** determines multi-hop settings per peer
5. **PeerInfo** returns information for all configured peers
6. **Shutdown** gracefully shuts down all peer sessions

The implementation includes comprehensive test coverage:

1. **TestLegacyConfigConversion** - Verifies backward compatibility by testing that legacy single-peer configs are automatically converted to multi-peer format
2. **TestMultiPeerConfig** - Tests that new multi-peer configurations are properly loaded with multiple peers
3. **TestNoPeersConfigError** - Ensures proper error handling when no peers are configured
4. **TestCommunityMerging** - Validates that global, per-peer, and per-route communities are correctly merged in order
5. **TestMultiHopConfiguration** - Tests multi-hop BGP settings with various scenarios:
   - Default behavior (multi-hop disabled)
   - Explicit multi-hop disable
   - Explicit multi-hop enable
6. **TestBestEffortAnnouncement** - Verifies that announcements succeed even when individual peers may have issues
7. **TestWithdrawMultiplePeers** - Tests route withdrawal across multiple peers
8. **TestPeerInfoMultiplePeers** - Validates that peer information is correctly returned for all configured peers

- **TestBgpNew** - Full integration test with actual BGP listeners (requires root, skipped in CI)
- **TestMultiPeerAnnouncement** - Tests actual route announcements to multiple BGP listeners (requires root, skipped in CI)

Existing configurations using `peer_ip` and `peer_as` continue to work without modification.

To add a second peer for resilience:

```yaml
bgp:
  local_as: 12345
  # Keep existing config for backward compatibility, or remove these lines
  # peer_as: 6789
  # peer_ip: 10.10.10.1

  # Add new multi-peer config
  peers:
    - peer_ip: 10.10.10.1
      peer_as: 6789
    - peer_ip: 10.10.10.2  # redundant peer
      peer_as: 6789
  communities:
    - 100:100
  origin: igp
```

All operations (Announce, Withdraw, Shutdown) use best-effort error handling:
- Operations continue even if individual peers fail
- Errors are collected and returned as aggregated error messages
- Format: `"announcement errors: [peer 10.10.10.1: error message, peer 10.10.10.2: error message]"`

These changes were authored via AI LLM.

Authored-By: Claude Code (Sonnet 4.5)
2026-06-17 15:52:43 +01:00

362 lines
8.7 KiB
Go

package controller
import (
"fmt"
"net"
"os/exec"
"strings"
"sync"
"time"
"github.com/golang/glog"
c "github.com/mayuresh82/gocast/config"
api "github.com/osrg/gobgp/api"
)
const (
defaultMonitorInterval = 10 * time.Second
defaultCleanupTimer = 15 * time.Minute
)
func portMonitor(protocol, port string) bool {
switch protocol {
case "tcp":
conn, err := net.Listen(protocol, ":"+port)
if err != nil {
glog.V(4).Infof("Monitor tcp port up")
return true
}
conn.Close()
case "udp":
conn, err := net.ListenPacket(protocol, ":"+port)
if err != nil {
glog.V(4).Infof("Monitor udp port up")
return true
}
conn.Close()
}
return false
}
func execMonitor(cmd string) bool {
out := exec.Command("bash", "-c", cmd)
if err := out.Start(); err != nil {
glog.V(2).Infof("Cannot exec cmd: %s : %v", cmd, err)
return false
}
if err := out.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); ok {
glog.V(4).Infof("Monitor cmd failed")
return false
}
}
return true
}
// appMon maintains the state of a registered app
type appMon struct {
app *App
done chan bool
announced bool
runLoopOn bool
}
// MonitorMgr manages the lifecycle of registered apps
type MonitorMgr struct {
monitors map[string]*appMon
cleanups map[string]chan bool
config *c.Config
ctrl *Controller
consul *ConsulMon
monMu sync.Mutex
clMu sync.Mutex
}
func NewMonitor(config *c.Config) *MonitorMgr {
ctrl, err := NewController(config.Bgp)
if err != nil {
glog.Exitf("Failed to start BGP controller: %v", err)
}
mon := &MonitorMgr{
ctrl: ctrl,
monitors: make(map[string]*appMon),
cleanups: make(map[string]chan bool),
}
if config.Agent.ConsulAddr != "" {
cmon, err := NewConsulMon(config.Agent.ConsulAddr, config.Agent.ConsulToken)
if err != nil {
glog.Errorf("Failed to start consul monitor: %v", err)
} else {
mon.consul = cmon
go mon.consulMon()
}
}
if config.Agent.MonitorInterval == 0 {
config.Agent.MonitorInterval = defaultMonitorInterval
}
if config.Agent.CleanupTimer == 0 {
config.Agent.CleanupTimer = defaultCleanupTimer
}
mon.config = config
// add apps defined in config
for _, a := range config.Apps {
app, err := NewApp(a.Name, a.Vip, a.VipConfig, a.Monitors, a.Nats, "config")
if err != nil {
glog.Errorf("Failed to add configured app %s: %v", a.Name, err)
continue
}
mon.Add(app)
}
return mon
}
// consulMon periodically queries consul for apps that need to be
// registered and adds them to the monitor manager
func (m *MonitorMgr) consulMon() {
for {
apps, err := m.consul.queryServices()
if err != nil {
glog.Errorf("Failed to query consul: %v", err)
} else {
for _, app := range apps {
m.Add(app)
}
// remove currently running apps that are not discovered in this pass
var toRemove []string
m.monMu.Lock()
for name, mon := range m.monitors {
if mon.app.Source != "consul" {
continue
}
var found bool
for _, app := range apps {
if name == app.Name {
found = true
break
}
}
if !found {
glog.V(2).Infof("Removing app: %s as it was not found in consul", name)
toRemove = append(toRemove, name)
}
}
m.monMu.Unlock()
for _, tr := range toRemove {
m.Remove(tr)
}
}
<-time.After(m.config.Agent.ConsulQueryInterval)
}
}
// Add adds a new app into monitor manager
func (m *MonitorMgr) Add(app *App) {
// check if already running
m.monMu.Lock()
var existing *appMon
for _, appMon := range m.monitors {
if appMon.app.Equal(app) {
glog.Infof("App %s already exists", app.Name)
existing = appMon
break
}
if appMon.app.Vip.Net.String() == app.Vip.Net.String() && appMon.app.Name != app.Name {
glog.Errorf("Error: Vip %s is already being announced by app: %s", app.Vip.Net.String(), appMon.app.Name)
m.monMu.Unlock()
return
}
}
m.monMu.Unlock()
// if the same app already exists but its run loop is not running,
// then just restart the run loop
if existing != nil {
if !existing.runLoopOn {
go m.runLoop(existing)
}
} else {
// else add a new app and start its run loop
appMon := &appMon{app: app, done: make(chan bool)}
m.monitors[app.Name] = appMon
go m.runLoop(appMon)
glog.Infof("Registered a new app: %v", app.String())
}
}
// Remove removes an app from monitor manager, stops BGP
/// announcement and cleans up state
func (m *MonitorMgr) Remove(appName string) {
m.monMu.Lock()
defer m.monMu.Unlock()
if a, ok := m.monitors[appName]; ok {
if a.runLoopOn {
close(a.done)
}
if a.announced {
if err := m.ctrl.Withdraw(a.app.Vip); err != nil {
glog.Errorf("Failed to withdraw route: %v", err)
}
}
if err := deleteLoopback(a.app.Vip.Net); err != nil {
glog.Errorf("Failed to remove app: %s: %v", a.app.Name, err)
}
for _, nat := range a.app.Nats {
parts := strings.Split(nat, ":")
switch len(parts) {
case 3:
if err := natRule("D", a.app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1], parts[2]); err != nil {
glog.Errorf("Failed to remove app: %s: %v", a.app.Name, err)
}
case 2:
if err := natRule("D", a.app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1], parts[1]); err != nil {
glog.Errorf("Failed to remove app: %s: %v", a.app.Name, err)
}
default:
continue
}
}
}
delete(m.monitors, appName)
}
func (m *MonitorMgr) runMonitors(app *App) bool {
for _, mon := range app.Monitors {
var check bool
switch mon.Type {
case Monitor_PORT:
check = portMonitor(mon.Protocol, mon.Port)
case Monitor_EXEC:
check = execMonitor(mon.Cmd)
case Monitor_CONSUL:
c, err := m.consul.healthCheck(app.Name)
if err != nil {
glog.Errorf("Failed to perform consul healthcheck for %s: %v", app.Name, err)
}
check = c
}
if !check {
glog.V(2).Infof("%s Monitor for app: %s Failed", mon.Type.String(), app.Name)
return false
}
}
return true
}
func (m *MonitorMgr) checkCond(am *appMon) error {
app := am.app
m.clMu.Lock()
defer m.clMu.Unlock()
if m.runMonitors(app) {
glog.V(2).Infof("All Monitors for app: %s succeeded", app.Name)
if !am.announced {
if err := addLoopback(app.Name, app.Vip.Net); err != nil {
return err
}
for _, nat := range app.Nats {
parts := strings.Split(nat, ":")
switch len(parts) {
case 3:
if err := natRule("A", app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1], parts[2]); err != nil {
return err
}
case 2:
if err := natRule("A", app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1], parts[1]); err != nil {
return err
}
default:
continue
}
}
if err := m.ctrl.Announce(app.Vip); err != nil {
return fmt.Errorf("Failed to announce route: %v", err)
}
am.announced = true
if exit, ok := m.cleanups[app.Name]; ok {
close(exit)
delete(m.cleanups, app.Name)
}
}
} else {
if am.announced {
if err := m.ctrl.Withdraw(app.Vip); err != nil {
return fmt.Errorf("Failed to withdraw route: %v", err)
}
am.announced = false
exit := make(chan bool)
go m.Cleanup(app.Name, exit)
m.cleanups[app.Name] = exit
}
}
return nil
}
// runLoop periodically checks if an app passes healthchecks
// and needs VIP announcement
func (m *MonitorMgr) runLoop(am *appMon) {
glog.Infof("Starting run-loop for app %s", am.app.Name)
am.runLoopOn = true
if err := m.checkCond(am); err != nil {
glog.Errorln(err)
}
t := time.NewTicker(m.config.Agent.MonitorInterval)
defer t.Stop()
for {
select {
case <-t.C:
if err := m.checkCond(am); err != nil {
glog.Errorln(err)
}
case <-am.done:
glog.Infof("Exit run-loop for app: %s", am.app.Name)
am.runLoopOn = false
return
}
}
}
// CloseAll shuts down all BGP sessions removes state
func (m *MonitorMgr) CloseAll() {
glog.Infof("Shutting down all open bgp sessions")
if err := m.ctrl.Shutdown(); err != nil {
glog.Errorf("Failed to shut-down BGP: %v", err)
}
for _, am := range m.monitors {
if am.runLoopOn {
close(am.done)
}
deleteLoopback(am.app.Vip.Net)
for _, nat := range am.app.Nats {
parts := strings.Split(nat, ":")
switch len(parts) {
case 3:
natRule("D", am.app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1], parts[2])
case 2:
natRule("D", am.app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1], parts[1])
default:
continue
}
}
}
}
// CleanUp periodically monitors for stale apps and cleans them up
func (m *MonitorMgr) Cleanup(app string, exit chan bool) {
t := time.NewTimer(m.config.Agent.CleanupTimer)
defer t.Stop()
for {
select {
case <-t.C:
glog.Infof("Cleaning up app %s", app)
m.Remove(app)
return
case <-exit:
return
}
}
}
// GetInfo returns basic BGP info for established peers
func (m *MonitorMgr) GetInfo() ([]*api.Peer, error) {
return m.ctrl.PeerInfo()
}