13 Commits

Author SHA1 Message Date
mayuresh82
a38dc2f48f Merge pull request #12 from mayuresh82/race_fix
attempt to fix a race condition
2022-03-09 12:28:28 -08:00
Mayuresh Gaitonde
04159185c9 Fix 2021-12-15 17:10:28 -08:00
Mayuresh Gaitonde
c860f3c50e checks to Add new app 2021-12-15 16:44:13 -08:00
Mayuresh Gaitonde
62db2e5af7 attempt to fix a race condition 2021-12-15 15:36:51 -08:00
mayuresh82
d39e46096f Merge pull request #11 from mayuresh82/deadlock_fix
fix the deadlock
2021-10-20 11:23:51 -07:00
Mayuresh Gaitonde
a99f92e9a5 fix 2021-10-18 18:21:01 -07:00
Mayuresh Gaitonde
5ac02c373b dockerfile fix 2021-10-18 18:16:22 -07:00
Mayuresh Gaitonde
6fdff28716 fix the deadlock 2021-10-18 15:50:13 -07:00
mayuresh82
e32ff1d52c Merge pull request #8 from mayuresh82/vip_config
Add ability to specify vip parameters
2021-05-14 09:35:05 -07:00
Mayuresh Gaitonde
82152d030d Fix dockerfile 2021-05-13 23:54:52 -07:00
Mayuresh Gaitonde
5821c01a7b Add ability to specify vip parameters 2021-05-13 20:33:05 -07:00
mayuresh82
8ca38f4b77 Merge pull request #7 from mayuresh82/tests
Tests
2020-12-18 11:47:07 -08:00
mayuresh82
b8ec7a3391 Create LICENSE 2020-11-28 00:41:16 -08:00
12 changed files with 154 additions and 75 deletions

View File

@@ -1,20 +1,21 @@
FROM golang:alpine as builder
FROM golang:1.14-alpine as builder
RUN apk update && \
apk upgrade && \
apk add --no-cache git && \
apk add make
RUN mkdir -p /opt/gocast
RUN mkdir -p /go/src/github.com/mayuresh82
RUN cd /go/src/github.com/mayuresh82 && \
git clone https://github.com/mayuresh82/gocast
RUN mkdir -p /go/src/github.com/mayuresh82/gocast
COPY . /go/src/github.com/mayuresh82/gocast
WORKDIR /go/src/github.com/mayuresh82/gocast
RUN make
RUN cp gocast /opt/gocast/
FROM alpine:latest
RUN apk --no-cache add ca-certificates bash iptables netcat-openbsd sudo
WORKDIR /root/
COPY --from=builder /opt/gocast/gocast .
COPY --from=builder /go/src/github.com/mayuresh82/gocast .
EXPOSE 8080/tcp

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 mayuresh82
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -24,4 +24,7 @@ bgp:
apps:
- name: app1
vip: 1.1.1.1/32
vip_config:
# additional per VIP BGP communities
bgp_communities: [ aaaa:bbbb ]
monitor: port:tcp:5000

View File

@@ -26,9 +26,16 @@ type BgpConfig struct {
Origin string
}
type VipConfig struct {
// per VIP BGP communities to announce. This is in addition to the
// global config
BgpCommunities []string `yaml:"bgp_communities"`
}
type AppConfig struct {
Name string
Vip string
VipConfig VipConfig `yaml:"vip_config"`
Monitors []string
Nats []string
}

View File

@@ -6,6 +6,7 @@ import (
"strings"
"github.com/golang/glog"
"github.com/mayuresh82/gocast/config"
)
type MonitorType int
@@ -51,7 +52,8 @@ func (m Monitors) Contains(elem *Monitor) bool {
type App struct {
Name string
Vip *net.IPNet
Vip *Route
VipConfig config.VipConfig
Monitors Monitors
Nats []string
Source string
@@ -66,10 +68,15 @@ func (a *App) Equal(other *App) bool {
return false
}
}
return a.Name == other.Name && a.Vip.String() == other.Vip.String()
return a.Name == other.Name && a.Vip.Net.String() == other.Vip.Net.String()
}
func NewApp(appName, vip string, monitors []string, nats []string, source string) (*App, error) {
func (a *App) String() string {
return fmt.Sprintf("Name: %s, Vip: %s, VipConf: %v, Monitors: %v, Nats: %v, Source: %s",
a.Name, a.Vip.Net.String(), a.VipConfig, a.Monitors, a.Nats, a.Source)
}
func NewApp(appName, vip string, vipConfig config.VipConfig, monitors []string, nats []string, source string) (*App, error) {
if appName == "" {
return nil, fmt.Errorf("Invalid app name")
}
@@ -78,7 +85,8 @@ func NewApp(appName, vip string, monitors []string, nats []string, source string
if err != nil {
return nil, fmt.Errorf("Invalid VIP specified, need ip/mask")
}
app.Vip = ipnet
app.Vip = &Route{Net: ipnet, Communities: vipConfig.BgpCommunities}
app.VipConfig = vipConfig
for _, m := range monitors {
// valid monitor formats:
// "port:tcp:123" , "exec:/local/check.sh", "consul"

View File

@@ -3,32 +3,36 @@ package controller
import (
"testing"
"github.com/mayuresh82/gocast/config"
"github.com/stretchr/testify/assert"
)
func TestAppParsing(t *testing.T) {
a := assert.New(t)
app1, err := NewApp("app1", "1.1.1.1/32", []string{"port:tcp:123"}, []string{}, "")
app1, err := NewApp("app1", "1.1.1.1/32", config.VipConfig{}, []string{"port:tcp:123"}, []string{}, "")
a.Nil(err)
app2, err := NewApp("app1", "1.1.1.1/32", []string{"port:tcp:123"}, []string{}, "")
app2, err := NewApp("app1", "1.1.1.1/32", config.VipConfig{BgpCommunities: []string{"111:222"}}, []string{"port:tcp:123"}, []string{}, "")
a.Nil(err)
app3, err := NewApp("app3", "2.2.2.2/32", []string{"exec:/bin/testme"}, []string{}, "")
app3, err := NewApp("app3", "2.2.2.2/32", config.VipConfig{}, []string{"exec:/bin/testme"}, []string{}, "")
a.Nil(err)
a.Equal("1.1.1.1/32", app1.Vip.String())
a.Equal("1.1.1.1/32", app1.Vip.Net.String())
a.Equal(Monitor_PORT, app1.Monitors[0].Type)
a.Equal("123", app1.Monitors[0].Port)
a.Equal("tcp", app1.Monitors[0].Protocol)
a.Equal(config.VipConfig{}, app1.VipConfig)
a.Equal(true, app1.Equal(app2))
a.Equal("111:222", app2.Vip.Communities[0])
a.Equal(Monitor_EXEC, app3.Monitors[0].Type)
a.Equal("/bin/testme", app3.Monitors[0].Cmd)
// test errors
_, err = NewApp("app4", "4.4.4.4", []string{}, []string{}, "")
_, err = NewApp("app4", "4.4.4.4", config.VipConfig{}, []string{}, []string{}, "")
a.NotNil(err)
_, err = NewApp("app4", "4.4.4.4/32", []string{"port:abcd::1023"}, []string{}, "")
_, err = NewApp("app4", "4.4.4.4/32", config.VipConfig{}, []string{"port:abcd::1023"}, []string{}, "")
a.NotNil(err)
}

View File

@@ -14,6 +14,11 @@ import (
gobgp "github.com/osrg/gobgp/pkg/server"
)
type Route struct {
Net *net.IPNet
Communities []string
}
type Controller struct {
peerAS int
localIP, peerIP net.IP
@@ -90,14 +95,14 @@ func (c *Controller) AddPeer(peer string) error {
return c.s.AddPeer(context.Background(), &api.AddPeerRequest{Peer: n})
}
func (c *Controller) getApiPath(route *net.IPNet) *api.Path {
func (c *Controller) getApiPath(route *Route) *api.Path {
afi := api.Family_AFI_IP
if route.IP.To4() == nil {
if route.Net.IP.To4() == nil {
afi = api.Family_AFI_IP6
}
prefixlen, _ := route.Mask.Size()
prefixlen, _ := route.Net.Mask.Size()
nlri, _ := ptypes.MarshalAny(&api.IPAddressPrefix{
Prefix: route.IP.String(),
Prefix: route.Net.IP.String(),
PrefixLen: uint32(prefixlen),
})
a1, _ := ptypes.MarshalAny(&api.OriginAttribute{
@@ -107,7 +112,7 @@ func (c *Controller) getApiPath(route *net.IPNet) *api.Path {
NextHop: c.localIP.String(),
})
var communities []uint32
for _, comm := range c.communities {
for _, comm := range append(c.communities, route.Communities...) {
communities = append(communities, convertCommunity(comm))
}
a3, _ := ptypes.MarshalAny(&api.CommunitiesAttribute{
@@ -121,7 +126,7 @@ func (c *Controller) getApiPath(route *net.IPNet) *api.Path {
}
}
func (c *Controller) Announce(route *net.IPNet) error {
func (c *Controller) Announce(route *Route) error {
var found bool
err := c.s.ListPeer(context.Background(), &api.ListPeerRequest{}, func(p *api.Peer) {
if p.Conf.NeighborAddress == c.peerIP.String() {
@@ -140,7 +145,7 @@ func (c *Controller) Announce(route *net.IPNet) error {
return err
}
func (c *Controller) Withdraw(route *net.IPNet) error {
func (c *Controller) Withdraw(route *Route) error {
return c.s.DeletePath(context.Background(), &api.DeletePathRequest{Path: c.getApiPath(route)})
}

View File

@@ -93,7 +93,8 @@ func TestBgpNew(t *testing.T) {
a.FailNow(err.Error())
}
_, ipnet, _ := net.ParseCIDR("20.30.40.0/24")
if err := ctrl.Announce(ipnet); err != nil {
r := &Route{Net: ipnet}
if err := ctrl.Announce(r); err != nil {
a.FailNow(err.Error())
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/golang/glog"
"github.com/mayuresh82/gocast/config"
)
const (
@@ -85,6 +86,7 @@ func (c *ConsulMon) queryServices() ([]*App, error) {
monitors []string
nats []string
)
var vipConf config.VipConfig
for _, tag := range service.Tags {
// try to find the requires tags. Only vip is mandatory
parts := strings.Split(tag, "=")
@@ -94,6 +96,8 @@ func (c *ConsulMon) queryServices() ([]*App, error) {
switch parts[0] {
case "gocast_vip":
vip = parts[1]
case "gocast_vip_communities":
vipConf.BgpCommunities = strings.Split(parts[1], ",")
case "gocast_monitor":
monitors = append(monitors, parts[1])
case "gocast_nat":
@@ -104,7 +108,7 @@ func (c *ConsulMon) queryServices() ([]*App, error) {
glog.Errorf("No vip Tag found in matched service :%s", service.Service)
continue
}
app, err := NewApp(service.Service, vip, monitors, nats, "consul")
app, err := NewApp(service.Service, vip, vipConf, monitors, nats, "consul")
if err != nil {
glog.Errorf("Unable to add consul app: %v", err)
continue

View File

@@ -6,6 +6,7 @@ import (
"net/http"
"testing"
"github.com/mayuresh82/gocast/config"
"github.com/stretchr/testify/assert"
)
@@ -15,7 +16,7 @@ var mockConsulData = map[string]string{
"ID": "test-app-1",
"Service": "test-service",
"Tags": [
"enable_gocast", "gocast_vip=1.1.1.1/32", "gocast_monitor=consul"
"enable_gocast", "gocast_vip=1.1.1.1/32", "gocast_monitor=consul", "gocast_vip_communities=111:222,333:444"
]
}
}}`,
@@ -103,7 +104,9 @@ func TestQueryServices(t *testing.T) {
a.FailNow(err.Error())
}
a.Equal(1, len(apps))
app, _ := NewApp("test-service", "1.1.1.1/32", []string{"consul"}, []string{}, "consul")
a.Equal([]string{"111:222", "333:444"}, apps[0].Vip.Communities)
app, _ := NewApp("test-service", "1.1.1.1/32", config.VipConfig{}, []string{"consul"}, []string{}, "consul")
a.True(app.Equal(apps[0]))
// test no match

View File

@@ -58,7 +58,7 @@ type appMon struct {
app *App
done chan bool
announced bool
checkOn bool
runLoopOn bool
}
// MonitorMgr manages the lifecycle of registered apps
@@ -69,7 +69,8 @@ type MonitorMgr struct {
ctrl *Controller
consul *ConsulMon
sync.Mutex
monMu sync.Mutex
clMu sync.Mutex
}
func NewMonitor(config *c.Config) *MonitorMgr {
@@ -100,7 +101,7 @@ func NewMonitor(config *c.Config) *MonitorMgr {
mon.config = config
// add apps defined in config
for _, a := range config.Apps {
app, err := NewApp(a.Name, a.Vip, a.Monitors, a.Nats, "config")
app, err := NewApp(a.Name, a.Vip, a.VipConfig, a.Monitors, a.Nats, "config")
if err != nil {
glog.Errorf("Failed to add configured app %s: %v", a.Name, err)
continue
@@ -123,7 +124,7 @@ func (m *MonitorMgr) consulMon() {
}
// remove currently running apps that are not discovered in this pass
var toRemove []string
m.Lock()
m.monMu.Lock()
for name, mon := range m.monitors {
if mon.app.Source != "consul" {
continue
@@ -140,10 +141,10 @@ func (m *MonitorMgr) consulMon() {
toRemove = append(toRemove, name)
}
}
m.monMu.Unlock()
for _, tr := range toRemove {
m.Remove(tr)
}
m.Unlock()
}
<-time.After(m.config.Agent.ConsulQueryInterval)
}
@@ -152,38 +153,51 @@ func (m *MonitorMgr) consulMon() {
// Add adds a new app into monitor manager
func (m *MonitorMgr) Add(app *App) {
// check if already running
m.Lock()
defer m.Unlock()
m.monMu.Lock()
var existing *appMon
for _, appMon := range m.monitors {
if appMon.app.Equal(app) && appMon.checkOn {
glog.V(2).Infof("App %s already exists", app.Name)
return
if appMon.app.Equal(app) {
glog.Infof("App %s already exists", app.Name)
existing = appMon
break
}
if appMon.app.Vip.String() == app.Vip.String() && appMon.app.Name != app.Name {
glog.Errorf("Error: Vip %s is already being announced by app: %s", app.Vip.String(), appMon.app.Name)
if appMon.app.Vip.Net.String() == app.Vip.Net.String() && appMon.app.Name != app.Name {
glog.Errorf("Error: Vip %s is already being announced by app: %s", app.Vip.Net.String(), appMon.app.Name)
m.monMu.Unlock()
return
}
}
m.Remove(app.Name)
m.monMu.Unlock()
// if the same app already exists but its run loop is not running,
// then just restart the run loop
if existing != nil {
if !existing.runLoopOn {
go m.runLoop(existing)
}
} else {
// else add a new app and start its run loop
appMon := &appMon{app: app, done: make(chan bool)}
m.monitors[app.Name] = appMon
go m.runLoop(appMon)
glog.Infof("Registered a new app: %v", app)
glog.Infof("Registered a new app: %v", app.String())
}
}
// Remove removes an app from monitor manager, stops BGP
/// announcement and cleans up state
func (m *MonitorMgr) Remove(appName string) {
m.monMu.Lock()
defer m.monMu.Unlock()
if a, ok := m.monitors[appName]; ok {
if a.checkOn {
a.done <- true
if a.runLoopOn {
close(a.done)
}
if a.announced {
if err := m.ctrl.Withdraw(a.app.Vip); err != nil {
glog.Errorf("Failed to withdraw route: %v", err)
}
}
if err := deleteLoopback(a.app.Vip); err != nil {
if err := deleteLoopback(a.app.Vip.Net); err != nil {
glog.Errorf("Failed to remove app: %s: %v", a.app.Name, err)
}
for _, nat := range a.app.Nats {
@@ -191,13 +205,14 @@ func (m *MonitorMgr) Remove(appName string) {
if len(parts) != 2 {
continue
}
if err := natRule("D", a.app.Vip.IP, m.ctrl.localIP, parts[0], parts[1]); err != nil {
if err := natRule("D", a.app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1]); err != nil {
glog.Errorf("Failed to remove app: %s: %v", a.app.Name, err)
}
}
}
delete(m.monitors, appName)
}
func (m *MonitorMgr) runMonitors(app *App) bool {
for _, mon := range app.Monitors {
var check bool
@@ -223,12 +238,12 @@ func (m *MonitorMgr) runMonitors(app *App) bool {
func (m *MonitorMgr) checkCond(am *appMon) error {
app := am.app
m.Lock()
defer m.Unlock()
m.clMu.Lock()
defer m.clMu.Unlock()
if m.runMonitors(app) {
glog.V(2).Infof("All Monitors for app: %s succeeded", app.Name)
if !am.announced {
if err := addLoopback(app.Name, app.Vip); err != nil {
if err := addLoopback(app.Name, app.Vip.Net); err != nil {
return err
}
for _, nat := range app.Nats {
@@ -236,7 +251,7 @@ func (m *MonitorMgr) checkCond(am *appMon) error {
if len(parts) != 2 {
continue
}
if err := natRule("A", app.Vip.IP, m.ctrl.localIP, parts[0], parts[1]); err != nil {
if err := natRule("A", app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1]); err != nil {
return err
}
}
@@ -245,7 +260,8 @@ func (m *MonitorMgr) checkCond(am *appMon) error {
}
am.announced = true
if exit, ok := m.cleanups[app.Name]; ok {
exit <- true
close(exit)
delete(m.cleanups, app.Name)
}
}
} else {
@@ -265,7 +281,8 @@ func (m *MonitorMgr) checkCond(am *appMon) error {
// runLoop periodically checks if an app passes healthchecks
// and needs VIP announcement
func (m *MonitorMgr) runLoop(am *appMon) {
am.checkOn = true
glog.Infof("Starting run-loop for app %s", am.app.Name)
am.runLoopOn = true
if err := m.checkCond(am); err != nil {
glog.Errorln(err)
}
@@ -278,7 +295,8 @@ func (m *MonitorMgr) runLoop(am *appMon) {
glog.Errorln(err)
}
case <-am.done:
glog.V(2).Infof("Exit run-loop for app: %s", am.app.Name)
glog.Infof("Exit run-loop for app: %s", am.app.Name)
am.runLoopOn = false
return
}
}
@@ -291,16 +309,16 @@ func (m *MonitorMgr) CloseAll() {
glog.Errorf("Failed to shut-down BGP: %v", err)
}
for _, am := range m.monitors {
if am.checkOn {
am.done <- true
if am.runLoopOn {
close(am.done)
}
deleteLoopback(am.app.Vip)
deleteLoopback(am.app.Vip.Net)
for _, nat := range am.app.Nats {
parts := strings.Split(nat, ":")
if len(parts) != 2 {
continue
}
natRule("D", am.app.Vip.IP, m.ctrl.localIP, parts[0], parts[1])
natRule("D", am.app.Vip.Net.IP, m.ctrl.localIP, parts[0], parts[1])
}
}
}
@@ -313,9 +331,8 @@ func (m *MonitorMgr) Cleanup(app string, exit chan bool) {
select {
case <-t.C:
glog.Infof("Cleaning up app %s", app)
m.Lock()
m.Remove(app)
m.Unlock()
return
case <-exit:
return
}

View File

@@ -4,9 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/golang/glog"
"github.com/mayuresh82/gocast/controller"
"net/http"
"strings"
"github.com/golang/glog"
"github.com/mayuresh82/gocast/config"
"github.com/mayuresh82/gocast/controller"
)
// Server is the main entrypoint into the app and serves app requests
@@ -46,7 +49,11 @@ func (s *Server) Serve(ctx context.Context) {
func (s *Server) registerHandler(w http.ResponseWriter, r *http.Request) {
queries := r.URL.Query()
app, err := controller.NewApp(queries["name"][0], queries["vip"][0], queries["monitor"], queries["nat"], "http")
var vipConf config.VipConfig
if vipComm, ok := queries["vip_communities"]; ok {
vipConf.BgpCommunities = strings.Split(vipComm[0], ",")
}
app, err := controller.NewApp(queries["name"][0], queries["vip"][0], vipConf, queries["monitor"], queries["nat"], "http")
if err != nil {
http.Error(w, fmt.Sprintf("Invalid request: %v", err), http.StatusBadRequest)
return
@@ -61,9 +68,7 @@ func (s *Server) unregisterHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Invalid request, need app name specified", http.StatusBadRequest)
return
}
s.mon.Lock()
s.mon.Remove(appName[0])
s.mon.Unlock()
}
func (s *Server) infoHandler(w http.ResponseWriter, r *http.Request) {