Added many more docs. Added an extra log level: warn, which is on by default. Normal operation are logs, unexcepted events are warnings. Added IDs to slave logs.

master
Jaime Pillora 2016-02-13 17:11:17 +11:00
parent 41359f8610
commit 7e03d62308
7 changed files with 437 additions and 164 deletions

105
README.md
View File

@ -4,7 +4,9 @@
Monitorable, gracefully restarting, self-upgrading binaries in Go (golang) Monitorable, gracefully restarting, self-upgrading binaries in Go (golang)
The main goal of this project is to facilitate the creation of self-upgrading binaries which play nice with standard process managers. The secondary goal is user simplicity. :warning: This is beta software. The main goal of this project is to facilitate the creation of self-upgrading binaries which play nice with standard process managers, secondly it should expose a simple API with reasonable defaults for maximum user enjoyment.
:warning: *This is beta software. Do not use in production. Consider the API unstable. Please report any [issues](https://github.com/jpillora/overseer) as you encounter them.*
### Features ### Features
@ -21,6 +23,8 @@ go get github.com/jpillora/overseer
### Quick example ### Quick example
This program works with process managers, supports graceful, zero-down time restarts and self-upgrades its own binary.
``` go ``` go
package main package main
@ -44,7 +48,6 @@ func main() {
URL: "http://localhost:4000/binaries/myapp", URL: "http://localhost:4000/binaries/myapp",
Interval: 1 * time.Second, Interval: 1 * time.Second,
}, },
// Log: true, //display log of overseer actions
}) })
} }
@ -58,33 +61,48 @@ func prog(state overseer.State) {
} }
``` ```
```sh **How it works**
$ cd example/
$ sh example.sh * `overseer` uses the main process to check for and install upgrades and a child process to run `Program`.
serving . on port 4000 * The main process retrieves the files of the listeners described by `Address/es`.
BUILT APP (1) * The child process is provided with these files which is converted into a `Listener/s` for the `Program` to consume.
RUNNING APP * All child process pipes are connected back to the main process.
app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) listening... * All signals received on the main process are forwarded through to the child process.
app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) says hello * `Fetcher` runs in a goroutine and checks for updates at preconfigured interval. When `Fetcher` returns a valid binary stream (`io.Reader`), the master process saves it to a temporary location, verifies it, replaces the current binary and initiates a graceful restart.
app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) says hello * The `fetcher.HTTP` accepts a `URL`, it polls this URL with HEAD requests and until it detects a change. On change, we `GET` the `URL` and stream it back out to `overseer`. See also `fetcher.S3`.
BUILT APP (2) * Once a binary is received, it is run with a simple echo token to confirm it is a `overseer` binary.
app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) listening... * Except for scheduled upgrades, the child process exiting will cause the main process to exit with the same code. So, **`overseer` is not a process manager**.
app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) says hello
app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) says hello See [Config](https://godoc.org/github.com/jpillora/overseer#Config)uration options [here](https://godoc.org/github.com/jpillora/overseer#Config) and the runtime [State](https://godoc.org/github.com/jpillora/overseer#State) available to your program [here](https://godoc.org/github.com/jpillora/overseer#State).
app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) says hello
app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) exiting...
BUILT APP (3)
app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) listening...
app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) says hello
app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) says hello
app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) says hello
app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) exiting...
app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) says hello
app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) exiting...
```
### More examples ### More examples
* See the [example/](example/) directory and run `example.sh`, you should see the following output:
```sh
$ cd example/
$ sh example.sh
serving . on port 5002
BUILT APP (1)
RUNNING APP
app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) listening...
app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) says hello
app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) says hello
BUILT APP (2)
app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) listening...
app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) says hello
app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) says hello
app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) says hello
app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) exiting...
BUILT APP (3)
app#3 (248f80ea049c835e7e3714b7169c539d3a4d6131) listening...
app#3 (248f80ea049c835e7e3714b7169c539d3a4d6131) says hello
app#3 (248f80ea049c835e7e3714b7169c539d3a4d6131) says hello
app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) says hello
app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) exiting...
app#3 (248f80ea049c835e7e3714b7169c539d3a4d6131) says hello
```
* Only use graceful restarts * Only use graceful restarts
```go ```go
@ -96,7 +114,7 @@ app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) exiting...
} }
``` ```
Send `main` a `SIGUSR2` to manually trigger a restart Send `main` a `SIGUSR2` (`Config.RestartSignal`) to manually trigger a restart
* Only use auto-upgrades, no restarts * Only use auto-upgrades, no restarts
@ -113,31 +131,22 @@ app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) exiting...
} }
``` ```
Your binary will be upgraded though it will require manual restart from the user Your binary will be upgraded though it will require manual restart from the user, suitable for creating self-upgrading command-line applications.
### Warnings ### Known issues
* The master process's `overseer.Config` cannot be changed via an upgrade, the master process must be restarted.
* Currently shells out to `mv` for moving files because `mv` handles cross-partition moves unlike `os.Rename`. * Currently shells out to `mv` for moving files because `mv` handles cross-partition moves unlike `os.Rename`.
* Bind `Addresses` can only be changed by restarting the main process. * `Addresses` can only be changed by restarting the main process.
* Only supported on darwin and linux. * Only supported on darwin and linux.
### Documentation ### More documentation
* [Core `overseer` package](https://godoc.org/github.com/jpillora/overseer) * [Core `overseer` package](https://godoc.org/github.com/jpillora/overseer)
* [Common `fetcher.Interface`](https://godoc.org/github.com/jpillora/overseer/fetcher#Interface) * [Common `fetcher.Interface`](https://godoc.org/github.com/jpillora/overseer/fetcher#Interface)
* [HTTP fetcher type](https://godoc.org/github.com/jpillora/overseer/fetcher#HTTP) * [HTTP fetcher type](https://godoc.org/github.com/jpillora/overseer/fetcher#HTTP)
* [S3 fetcher type](https://godoc.org/github.com/jpillora/overseer/fetcher#S3) * [S3 fetcher type](https://godoc.org/github.com/jpillora/overseer/fetcher#S3)
### Architecture overview
* `overseer` uses the main process to check for and install upgrades and a child process to run `Program`
* All child process pipes are connected back to the main process
* All signals received on the main process are forwarded through to the child process
* The provided `fetcher.Interface` will be used to `Fetch()` the latest build of the binary
* The `fetcher.HTTP` accepts a `URL`, it polls this URL with HEAD requests and until it detects a change. On change, we `GET` the `URL` and stream it back out to `overseer`.
* Once a binary is received, it is run with a simple echo token to confirm it is a `overseer` binary.
* Except for scheduled upgrades, the child process exiting will cause the main process to exit with the same code. So, **`overseer` is not a process manager**.
### Docker ### Docker
1. Compile your `overseer`able `app` to a `/path/on/docker/host/dir/app` 1. Compile your `overseer`able `app` to a `/path/on/docker/host/dir/app`
@ -149,16 +158,16 @@ app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) exiting...
1. For testing, swap out `-d` (daemonize) for `--rm -it` (remove on exit, input, terminal) 1. For testing, swap out `-d` (daemonize) for `--rm -it` (remove on exit, input, terminal)
1. `app` can use the current working directory as storage 1. `app` can use the current working directory as storage
1. `debian` doesn't ship with TLS certs, you can mount them in with `-v /etc/ssl/certs/ca-certificates.crt:/etc/ssl/certs/ca-certificates.crt` 1. If the OS doesn't ship with TLS certs, you can mount them from the host with `-v /etc/ssl/certs/ca-certificates.crt:/etc/ssl/certs/ca-certificates.crt`
### Alternatives
* https://github.com/sanbornm/go-selfupdate
* https://github.com/inconshreveable/go-update
### TODO ### TODO
* Log levels * Tests! The test suite should drive an:
* HTTP client for application version/uptime testing
* HTTP server for application upgrades
* `overseer` binary process
* HTTP fetcher long-polling
* SCP fetcher (connect to a server, poll path)
* Github fetcher (given a repo, poll releases) * Github fetcher (given a repo, poll releases)
* etcd fetcher (given a cluster, watch key) * etcd fetcher (given a cluster, watch key)
* `overseer` CLI tool ([TODO](cmd/overseer/TODO.md)) * `overseer` CLI tool ([TODO](cmd/overseer/TODO.md))

View File

@ -30,12 +30,12 @@ func prog(state overseer.State) {
//'main()' is run in the initial process //'main()' is run in the initial process
func main() { func main() {
overseer.Run(overseer.Config{ overseer.Run(overseer.Config{
Log: false, //display log of overseer actions
Program: prog, Program: prog,
Address: ":5001", Address: ":5001",
Fetcher: &fetcher.HTTP{ Fetcher: &fetcher.HTTP{
URL: "http://localhost:5002/myappnew", URL: "http://localhost:5002/myappnew",
Interval: 1 * time.Second, Interval: 1 * time.Second,
}, },
Debug: false, //display log of overseer actions
}) })
} }

View File

@ -2,6 +2,7 @@
package overseer package overseer
import ( import (
"errors"
"fmt" "fmt"
"log" "log"
"os" "os"
@ -12,6 +13,7 @@ import (
) )
const ( const (
envSlaveID = "GO_UPGRADE_SLAVE_ID"
envIsSlave = "GO_UPGRADE_IS_SLAVE" envIsSlave = "GO_UPGRADE_IS_SLAVE"
envNumFDs = "GO_UPGRADE_NUM_FDS" envNumFDs = "GO_UPGRADE_NUM_FDS"
envBinID = "GO_UPGRADE_BIN_ID" envBinID = "GO_UPGRADE_BIN_ID"
@ -19,9 +21,9 @@ const (
) )
type Config struct { type Config struct {
//Optional allows overseer to fallback to running //Required will prevent overseer from fallback to running
//running the program in the main process. //running the program in the main process on failure.
Optional bool Required bool
//Program's main function //Program's main function
Program func(state State) Program func(state State)
//Program's zero-downtime socket listening address (set this or Addresses) //Program's zero-downtime socket listening address (set this or Addresses)
@ -41,31 +43,24 @@ type Config struct {
//PreUpgrade runs after a binary has been retreived, user defined checks //PreUpgrade runs after a binary has been retreived, user defined checks
//can be run here and returning an error will cancel the upgrade. //can be run here and returning an error will cancel the upgrade.
PreUpgrade func(tempBinaryPath string) error PreUpgrade func(tempBinaryPath string) error
//Log enables [overseer] logs to be sent to stdout. //Debug enables all [overseer] logs.
Log bool Debug bool
//NoWarn disables warning [overseer] logs.
NoWarn bool
//NoRestartAfterFetch disables automatic restarts after each upgrade. //NoRestartAfterFetch disables automatic restarts after each upgrade.
NoRestartAfterFetch bool NoRestartAfterFetch bool
//Fetcher will be used to fetch binaries. //Fetcher will be used to fetch binaries.
Fetcher fetcher.Interface Fetcher fetcher.Interface
} }
func fatalf(f string, args ...interface{}) { func validate(c *Config) error {
log.Fatalf("[overseer] "+f, args...)
}
func Run(c Config) {
//sanity check
if token := os.Getenv(envBinCheck); token != "" {
fmt.Fprint(os.Stdout, token)
os.Exit(0)
}
//validate //validate
if c.Program == nil { if c.Program == nil {
fatalf("overseer.Config.Program required") return errors.New("overseer.Config.Program required")
} }
if c.Address != "" { if c.Address != "" {
if len(c.Addresses) > 0 { if len(c.Addresses) > 0 {
fatalf("overseer.Config.Address and Addresses cant both be set") return errors.New("overseer.Config.Address and Addresses cant both be set")
} }
c.Addresses = []string{c.Address} c.Addresses = []string{c.Address}
} else if len(c.Addresses) > 0 { } else if len(c.Addresses) > 0 {
@ -74,28 +69,57 @@ func Run(c Config) {
if c.RestartSignal == nil { if c.RestartSignal == nil {
c.RestartSignal = SIGUSR2 c.RestartSignal = SIGUSR2
} }
if c.TerminateTimeout == 0 { if c.TerminateTimeout <= 0 {
c.TerminateTimeout = 30 * time.Second c.TerminateTimeout = 30 * time.Second
} }
if c.MinFetchInterval == 0 { if c.MinFetchInterval <= 0 {
c.MinFetchInterval = 1 * time.Second c.MinFetchInterval = 1 * time.Second
} }
//os not supported return nil
if !supported { }
if !c.Optional {
fatalf("os (%s) not supported", runtime.GOOS) //RunErr allows manual handling of any
//overseer errors.
func RunErr(c Config) error {
return runErr(&c)
}
//Run executes overseer, if an error is
//encounted, overseer fallsback to running
//the program directly (unless Required is set).
func Run(c Config) {
err := runErr(&c)
if err != nil {
if c.Required {
log.Fatalf("[overseer] %s", err)
} else if c.Debug || !c.NoWarn {
log.Printf("[overseer] disabled. run failed: %s", err)
} }
c.Program(DisabledState) c.Program(DisabledState)
return return
} }
os.Exit(0)
}
func runErr(c *Config) error {
if err := validate(c); err != nil {
return err
}
//sanity check
if token := os.Getenv(envBinCheck); token != "" {
fmt.Fprint(os.Stdout, token)
return nil
}
//os not supported
if !supported {
return fmt.Errorf("os (%s) not supported", runtime.GOOS)
}
//run either in master or slave mode //run either in master or slave mode
if os.Getenv(envIsSlave) == "1" { if os.Getenv(envIsSlave) == "1" {
sp := slave{Config: c} sp := slave{Config: c}
sp.logf("run") return sp.run()
sp.run()
} else { } else {
mp := master{Config: c} mp := master{Config: c}
mp.logf("run") return mp.run()
mp.run()
} }
} }

View File

@ -25,7 +25,8 @@ var tmpBinPath = filepath.Join(os.TempDir(), "overseer-"+token())
//a overseer master process //a overseer master process
type master struct { type master struct {
Config *Config
slaveID int
slaveCmd *exec.Cmd slaveCmd *exec.Cmd
slaveExtraFiles []*os.File slaveExtraFiles []*os.File
binPath, tmpBinPath string binPath, tmpBinPath string
@ -40,30 +41,26 @@ type master struct {
signalledAt time.Time signalledAt time.Time
} }
func (mp *master) run() { func (mp *master) run() error {
mp.debugf("run")
if err := mp.checkBinary(); err != nil { if err := mp.checkBinary(); err != nil {
if !mp.Config.Optional { return err
fatalf("%s", err)
}
//run program directly
mp.logf("%s, disabling overseer.", err)
mp.Program(DisabledState)
return
} }
if mp.Config.Fetcher != nil { if mp.Config.Fetcher != nil {
if err := mp.Config.Fetcher.Init(); err != nil { if err := mp.Config.Fetcher.Init(); err != nil {
mp.logf("fetcher init failed (%s)", err) mp.warnf("fetcher init failed (%s). fetcher disabled.", err)
mp.Config.Fetcher = nil mp.Config.Fetcher = nil
} }
} }
mp.setupSignalling() mp.setupSignalling()
mp.retreiveFileDescriptors() if err := mp.retreiveFileDescriptors(); err != nil {
return err
}
if mp.Config.Fetcher != nil { if mp.Config.Fetcher != nil {
//TODO is required? fatalf("overseer.Config.Fetcher required")
mp.fetch() mp.fetch()
go mp.fetchLoop() go mp.fetchLoop()
} }
mp.forkLoop() return mp.forkLoop()
} }
func (mp *master) checkBinary() error { func (mp *master) checkBinary() error {
@ -119,58 +116,59 @@ func (mp *master) handleSignal(s os.Signal) {
//user initiated manual restart //user initiated manual restart
mp.triggerRestart() mp.triggerRestart()
} else if s.String() == "child exited" { } else if s.String() == "child exited" {
// will occur on every restart // will occur on every restart, ignore it
} else } else
//**during a restart** a SIGUSR1 signals //**during a restart** a SIGUSR1 signals
//to the master process that, the file //to the master process that, the file
//descriptors have been released //descriptors have been released
if mp.awaitingUSR1 && s == SIGUSR1 { if mp.awaitingUSR1 && s == SIGUSR1 {
mp.logf("signaled, sockets ready") mp.debugf("signaled, sockets ready")
mp.awaitingUSR1 = false mp.awaitingUSR1 = false
mp.descriptorsReleased <- true mp.descriptorsReleased <- true
} else } else
//while the slave process is running, proxy //while the slave process is running, proxy
//all signals through //all signals through
if mp.slaveCmd != nil && mp.slaveCmd.Process != nil { if mp.slaveCmd != nil && mp.slaveCmd.Process != nil {
mp.logf("proxy signal (%s)", s) mp.debugf("proxy signal (%s)", s)
mp.sendSignal(s) mp.sendSignal(s)
} else } else
//otherwise if not running, kill on CTRL+c //otherwise if not running, kill on CTRL+c
if s == os.Interrupt { if s == os.Interrupt {
mp.logf("interupt with no slave") mp.debugf("interupt with no slave")
os.Exit(1) os.Exit(1)
} else { } else {
mp.logf("signal discarded (%s), no slave process", s) mp.debugf("signal discarded (%s), no slave process", s)
} }
} }
func (mp *master) sendSignal(s os.Signal) { func (mp *master) sendSignal(s os.Signal) {
if err := mp.slaveCmd.Process.Signal(s); err != nil { if err := mp.slaveCmd.Process.Signal(s); err != nil {
mp.logf("signal failed (%s), assuming slave process died", err) mp.debugf("signal failed (%s), assuming slave process died unexpectedly", err)
os.Exit(1) os.Exit(1)
} }
} }
func (mp *master) retreiveFileDescriptors() { func (mp *master) retreiveFileDescriptors() error {
mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses)) mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses))
for i, addr := range mp.Config.Addresses { for i, addr := range mp.Config.Addresses {
a, err := net.ResolveTCPAddr("tcp", addr) a, err := net.ResolveTCPAddr("tcp", addr)
if err != nil { if err != nil {
fatalf("invalid address: %s (%s)", addr, err) return fmt.Errorf("Invalid address %s (%s)", addr, err)
} }
l, err := net.ListenTCP("tcp", a) l, err := net.ListenTCP("tcp", a)
if err != nil { if err != nil {
fatalf(err.Error()) return err
} }
f, err := l.File() f, err := l.File()
if err != nil { if err != nil {
fatalf("failed to retreive fd for: %s (%s)", addr, err) return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err)
} }
if err := l.Close(); err != nil { if err := l.Close(); err != nil {
fatalf("failed to close listener for: %s (%s)", addr, err) return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err)
} }
mp.slaveExtraFiles[i] = f mp.slaveExtraFiles[i] = f
} }
return nil
} }
//fetchLoop is run in a goroutine //fetchLoop is run in a goroutine
@ -180,6 +178,7 @@ func (mp *master) fetchLoop() {
for { for {
t0 := time.Now() t0 := time.Now()
mp.fetch() mp.fetch()
//duration fetch of fetch
diff := time.Now().Sub(t0) diff := time.Now().Sub(t0)
if diff < min { if diff < min {
delay := min - diff delay := min - diff
@ -194,67 +193,66 @@ func (mp *master) fetch() {
if mp.restarting { if mp.restarting {
return //skip if restarting return //skip if restarting
} }
mp.debugf("checking for updates...")
mp.logf("checking for updates...")
reader, err := mp.Fetcher.Fetch() reader, err := mp.Fetcher.Fetch()
if err != nil { if err != nil {
mp.logf("failed to get latest version: %s", err) mp.debugf("failed to get latest version: %s", err)
return return
} }
if reader == nil { if reader == nil {
mp.logf("no updates") mp.debugf("no updates")
return //fetcher has explicitly said there are no updates return //fetcher has explicitly said there are no updates
} }
mp.logf("streaming update...") mp.debugf("streaming update...")
//optional closer //optional closer
if closer, ok := reader.(io.Closer); ok { if closer, ok := reader.(io.Closer); ok {
defer closer.Close() defer closer.Close()
} }
tmpBin, err := os.OpenFile(tmpBinPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) tmpBin, err := os.OpenFile(tmpBinPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil { if err != nil {
mp.logf("failed to open temp binary: %s", err) mp.warnf("failed to open temp binary: %s", err)
return return
} }
// defer func() { defer func() {
// tmpBin.Close() tmpBin.Close()
// os.Remove(tmpBinPath) os.Remove(tmpBinPath)
// }() }()
//tee off to sha1 //tee off to sha1
hash := sha1.New() hash := sha1.New()
reader = io.TeeReader(reader, hash) reader = io.TeeReader(reader, hash)
//write to a temp file //write to a temp file
_, err = io.Copy(tmpBin, reader) _, err = io.Copy(tmpBin, reader)
if err != nil { if err != nil {
mp.logf("failed to write temp binary: %s", err) mp.warnf("failed to write temp binary: %s", err)
return return
} }
//compare hash //compare hash
newHash := hash.Sum(nil) newHash := hash.Sum(nil)
if bytes.Equal(mp.binHash, newHash) { if bytes.Equal(mp.binHash, newHash) {
mp.logf("hash match - skip") mp.debugf("hash match - skip")
return return
} }
//copy permissions //copy permissions
if err := tmpBin.Chmod(mp.binPerms); err != nil { if err := tmpBin.Chmod(mp.binPerms); err != nil {
mp.logf("failed to make temp binary executable: %s", err) mp.warnf("failed to make temp binary executable: %s", err)
return return
} }
if err := tmpBin.Chown(uid, gid); err != nil { if err := tmpBin.Chown(uid, gid); err != nil {
mp.logf("failed to change owner of binary: %s", err) mp.warnf("failed to change owner of binary: %s", err)
return return
} }
if _, err := tmpBin.Stat(); err != nil { if _, err := tmpBin.Stat(); err != nil {
mp.logf("failed to stat temp binary: %s", err) mp.warnf("failed to stat temp binary: %s", err)
return return
} }
tmpBin.Close() tmpBin.Close()
if _, err := os.Stat(tmpBinPath); err != nil { if _, err := os.Stat(tmpBinPath); err != nil {
mp.logf("failed to stat temp binary by path: %s", err) mp.warnf("failed to stat temp binary by path: %s", err)
return return
} }
if mp.Config.PreUpgrade != nil { if mp.Config.PreUpgrade != nil {
if err := mp.Config.PreUpgrade(tmpBinPath); err != nil { if err := mp.Config.PreUpgrade(tmpBinPath); err != nil {
mp.logf("user cancelled upgrade: %s", err) mp.warnf("user cancelled upgrade: %s", err)
return return
} }
} }
@ -264,19 +262,19 @@ func (mp *master) fetch() {
cmd.Env = []string{envBinCheck + "=" + tokenIn} cmd.Env = []string{envBinCheck + "=" + tokenIn}
tokenOut, err := cmd.Output() tokenOut, err := cmd.Output()
if err != nil { if err != nil {
mp.logf("failed to run temp binary: %s", err) mp.warnf("failed to run temp binary: %s", err)
return return
} }
if tokenIn != string(tokenOut) { if tokenIn != string(tokenOut) {
mp.logf("sanity check failed") mp.warnf("sanity check failed")
return return
} }
//overwrite! //overwrite!
if err := move(mp.binPath, tmpBinPath); err != nil { if err := move(mp.binPath, tmpBinPath); err != nil {
mp.logf("failed to overwrite binary: %s", err) mp.warnf("failed to overwrite binary: %s", err)
return return
} }
mp.logf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12]) mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12])
mp.binHash = newHash mp.binHash = newHash
//binary successfully replaced //binary successfully replaced
if !mp.Config.NoRestartAfterFetch { if !mp.Config.NoRestartAfterFetch {
@ -288,13 +286,13 @@ func (mp *master) fetch() {
func (mp *master) triggerRestart() { func (mp *master) triggerRestart() {
if mp.restarting { if mp.restarting {
mp.logf("already graceful restarting") mp.debugf("already graceful restarting")
return //skip return //skip
} else if mp.slaveCmd == nil || mp.restarting { } else if mp.slaveCmd == nil || mp.restarting {
mp.logf("no slave process") mp.debugf("no slave process")
return //skip return //skip
} }
mp.logf("graceful restart triggered") mp.debugf("graceful restart triggered")
mp.restarting = true mp.restarting = true
mp.awaitingUSR1 = true mp.awaitingUSR1 = true
mp.signalledAt = time.Now() mp.signalledAt = time.Now()
@ -302,31 +300,37 @@ func (mp *master) triggerRestart() {
select { select {
case <-mp.restarted: case <-mp.restarted:
//success //success
mp.logf("restart success") mp.debugf("restart success")
case <-time.After(mp.TerminateTimeout): case <-time.After(mp.TerminateTimeout):
//times up process, we did ask nicely! //times up mr. process, we did ask nicely!
mp.logf("graceful timeout, forcing exit") mp.debugf("graceful timeout, forcing exit")
mp.sendSignal(os.Kill) mp.sendSignal(os.Kill)
} }
} }
//not a real fork //not a real fork
func (mp *master) forkLoop() { func (mp *master) forkLoop() error {
//loop, restart command //loop, restart command
for { for {
mp.fork() if err := mp.fork(); err != nil {
return err
}
} }
} }
func (mp *master) fork() { func (mp *master) fork() error {
mp.logf("starting %s", mp.binPath) mp.debugf("starting %s", mp.binPath)
cmd := exec.Command(mp.binPath) cmd := exec.Command(mp.binPath)
//mark this new process as the "active" slave process.
//this process is assumed to be holding the socket files.
mp.slaveCmd = cmd mp.slaveCmd = cmd
mp.slaveID++
//provide the slave process with some state
e := os.Environ() e := os.Environ()
e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash)) e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash))
e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID))
e = append(e, envIsSlave+"=1") e = append(e, envIsSlave+"=1")
e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.Config.Addresses))) e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles)))
cmd.Env = e cmd.Env = e
//inherit master args/stdfiles //inherit master args/stdfiles
cmd.Args = os.Args cmd.Args = os.Args
@ -336,8 +340,9 @@ func (mp *master) fork() {
//include socket files //include socket files
cmd.ExtraFiles = mp.slaveExtraFiles cmd.ExtraFiles = mp.slaveExtraFiles
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
fatalf("failed to fork: %s", err) return fmt.Errorf("Failed to start slave process: %s", err)
} }
//was scheduled to restart, notify success
if mp.restarting { if mp.restarting {
mp.restartedAt = time.Now() mp.restartedAt = time.Now()
mp.restarting = false mp.restarting = false
@ -352,10 +357,6 @@ func (mp *master) fork() {
select { select {
case err := <-cmdwait: case err := <-cmdwait:
//program exited before releasing descriptors //program exited before releasing descriptors
if mp.restarting {
//restart requested
return
}
//proxy exit code out to master //proxy exit code out to master
code := 0 code := 0
if err != nil { if err != nil {
@ -366,20 +367,32 @@ func (mp *master) fork() {
} }
} }
} }
mp.logf("prog exited with %d", code) mp.debugf("prog exited with %d", code)
//proxy exit with same code //if a restart wasn't requested, proxy
os.Exit(code) //through the exit code via the main process
if !mp.restarting {
os.Exit(code)
}
case <-mp.descriptorsReleased: case <-mp.descriptorsReleased:
//if descriptors are released, the program //if descriptors are released, the program
//has yielded control of its sockets and //has yielded control of its sockets and
//a new instance should be started to pick //a parallel instance of the program can be
//them up. The previous cmd.Wait() will still //started safely. it should serve state.Listeners
//be consumed though it will be discarded. //to ensure downtime is kept at <1sec. The previous
//cmd.Wait() will still be consumed though the
//result will be discarded.
}
return nil
}
func (mp *master) debugf(f string, args ...interface{}) {
if mp.Config.Debug {
log.Printf("[overseer master] "+f, args...)
} }
} }
func (mp *master) logf(f string, args ...interface{}) { func (mp *master) warnf(f string, args ...interface{}) {
if mp.Log { if mp.Config.Debug || !mp.Config.NoWarn {
log.Printf("[overseer master] "+f, args...) log.Printf("[overseer master] "+f, args...)
} }
} }

View File

@ -1,6 +1,7 @@
package overseer package overseer
import ( import (
"fmt"
"log" "log"
"net" "net"
"os" "os"
@ -44,51 +45,61 @@ type State struct {
//a overseer slave process //a overseer slave process
type slave struct { type slave struct {
Config *Config
id string
listeners []*upListener listeners []*upListener
masterPid int masterPid int
masterProc *os.Process masterProc *os.Process
state State state State
} }
func (sp *slave) run() { func (sp *slave) run() error {
sp.id = os.Getenv(envSlaveID)
sp.debugf("run")
sp.state.Enabled = true sp.state.Enabled = true
sp.state.ID = os.Getenv(envBinID) sp.state.ID = os.Getenv(envBinID)
sp.state.StartedAt = time.Now() sp.state.StartedAt = time.Now()
sp.state.Address = sp.Config.Address sp.state.Address = sp.Config.Address
sp.state.Addresses = sp.Config.Addresses sp.state.Addresses = sp.Config.Addresses
sp.state.GracefulShutdown = make(chan bool, 1) sp.state.GracefulShutdown = make(chan bool, 1)
sp.watchParent() if err := sp.watchParent(); err != nil {
sp.initFileDescriptors() return err
}
if err := sp.initFileDescriptors(); err != nil {
return err
}
sp.watchSignal() sp.watchSignal()
//run program with state //run program with state
sp.logf("start program") sp.debugf("start program")
sp.Config.Program(sp.state) sp.Config.Program(sp.state)
return nil
} }
func (sp *slave) watchParent() { func (sp *slave) watchParent() error {
sp.masterPid = os.Getppid() sp.masterPid = os.Getppid()
proc, err := os.FindProcess(sp.masterPid) proc, err := os.FindProcess(sp.masterPid)
if err != nil { if err != nil {
fatalf("parent process %s", err) return fmt.Errorf("master process: %s", err)
} }
sp.masterProc = proc sp.masterProc = proc
go func() { go func() {
//send signal 0 to master process forever
for { for {
//sending signal 0 should not error as long as the process is alive //should not error as long as the process is alive
if err := sp.masterProc.Signal(syscall.Signal(0)); err != nil { if err := sp.masterProc.Signal(syscall.Signal(0)); err != nil {
os.Exit(1) os.Exit(1)
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }
}() }()
return nil
} }
func (sp *slave) initFileDescriptors() { func (sp *slave) initFileDescriptors() error {
//inspect file descriptors //inspect file descriptors
numFDs, err := strconv.Atoi(os.Getenv(envNumFDs)) numFDs, err := strconv.Atoi(os.Getenv(envNumFDs))
if err != nil { if err != nil {
fatalf("invalid %s integer", envNumFDs) return fmt.Errorf("invalid %s integer", envNumFDs)
} }
sp.listeners = make([]*upListener, numFDs) sp.listeners = make([]*upListener, numFDs)
sp.state.Listeners = make([]net.Listener, numFDs) sp.state.Listeners = make([]net.Listener, numFDs)
@ -96,7 +107,7 @@ func (sp *slave) initFileDescriptors() {
f := os.NewFile(uintptr(3+i), "") f := os.NewFile(uintptr(3+i), "")
l, err := net.FileListener(f) l, err := net.FileListener(f)
if err != nil { if err != nil {
fatalf("failed to inherit file descriptor: %d", i) return fmt.Errorf("failed to inherit file descriptor: %d", i)
} }
u := newUpListener(l) u := newUpListener(l)
sp.listeners[i] = u sp.listeners[i] = u
@ -105,6 +116,7 @@ func (sp *slave) initFileDescriptors() {
if len(sp.state.Listeners) > 0 { if len(sp.state.Listeners) > 0 {
sp.state.Listener = sp.state.Listeners[0] sp.state.Listener = sp.state.Listeners[0]
} }
return nil
} }
func (sp *slave) watchSignal() { func (sp *slave) watchSignal() {
@ -113,7 +125,7 @@ func (sp *slave) watchSignal() {
go func() { go func() {
<-signals <-signals
signal.Stop(signals) signal.Stop(signals)
sp.logf("graceful shutdown requested") sp.debugf("graceful shutdown requested")
//master wants to restart, //master wants to restart,
sp.state.GracefulShutdown <- true sp.state.GracefulShutdown <- true
//release any sockets and notify master //release any sockets and notify master
@ -129,14 +141,20 @@ func (sp *slave) watchSignal() {
//start death-timer //start death-timer
go func() { go func() {
time.Sleep(sp.Config.TerminateTimeout) time.Sleep(sp.Config.TerminateTimeout)
sp.logf("timeout. forceful shutdown") sp.debugf("timeout. forceful shutdown")
os.Exit(1) os.Exit(1)
}() }()
}() }()
} }
func (sp *slave) logf(f string, args ...interface{}) { func (sp *slave) debugf(f string, args ...interface{}) {
if sp.Log { if sp.Config.Debug {
log.Printf("[overseer slave] "+f, args...) log.Printf("[overseer slave#"+sp.id+"] "+f, args...)
}
}
func (sp *slave) warnf(f string, args ...interface{}) {
if sp.Config.Debug || !sp.Config.NoWarn {
log.Printf("[overseer slave#"+sp.id+"] "+f, args...)
} }
} }

View File

@ -22,7 +22,7 @@ var (
func move(dst, src string) error { func move(dst, src string) error {
//HACK: we're shelling out to mv because linux //HACK: we're shelling out to mv because linux
//throws errors when we use Rename/Create a //throws errors when crossing device boundaryes.
//running binary. //TODO see sys_posix_mv.go
return exec.Command("mv", src, dst).Run() return exec.Command("mv", src, dst).Run()
} }

209
sys_posix_mv.go Normal file
View File

@ -0,0 +1,209 @@
package overseer
// TODO(@jpillora) borrowed from https://github.com/aisola/go-coreutils/blob/master/mv/mv.go
//
// mv.go (go-coreutils) 0.1
// Copyright (C) 2014, The GO-Coreutils Developers.
//
// Written By: Abram C. Isola, Michael Murphy
//
// package main
//
// import "bufio"
// import "flag"
// import "fmt"
// import "io"
// import "os"
// import "path/filepath"
//
// const (
// help_text string = `
// Usage: mv [OPTION]... [PATH]... [PATH]
// or: mv [PATH] [PATH]
// or: mv [OPTION]
// move or rename files or directories
// --help display this help and exit
// --version output version information and exit
// -f, --force remove existing destination files and never prompt the user
// ` // -v, --verbose print the name of each file before moving it
// version_text = `
// mv (go-coreutils) 0.1
// Copyright (C) 2014, The GO-Coreutils Developers.
// This program comes with ABSOLUTELY NO WARRANTY; for details see
// LICENSE. This is free software, and you are welcome to redistribute
// it under certain conditions in LICENSE.
// `
// )
//
// var (
// forceEnabled = flag.Bool("f", false, "remove existing destination files and never prompt the user")
// forceEnabledLong = flag.Bool("force", false, "remove existing destination files and never prompt the user")
// )
//
// // The input function prints a statement to the user and accepts an input, then returns the input.
//
// func input(prompt, location string) string {
// fmt.Printf(prompt, location)
//
// reader := bufio.NewReader(os.Stdin)
// userinput, _ := reader.ReadString([]byte("\n")[0])
//
// return userinput
// }
//
// // The fileExists function will check if the file exists.
//
// func fileExists(filep string) os.FileInfo {
// fp, err := os.Stat(filep)
// if err != nil && os.IsNotExist(err) {
// return nil
// }
// return fp
// }
//
// /* The argumentCheck function will check the number of arguments given to the program and process them
// * accordingly. */
//
// func argumentCheck(files []string) {
// switch len(files) {
// case 0: // If there is no argument
// fmt.Println("mv: missing file operand\nTry 'mv -help' for more information")
// os.Exit(0)
// case 1: // If there is one argument
// fmt.Printf("mv: missing destination file operand after '%s'\nTry 'mv -help' for more information.\n", files[0])
// os.Exit(0)
// case 2: // If there are two arguments
// mover(files[0], files[1])
// default: // If there are more than two arguments
// to_file, files := files[len(files)-1], files[:len(files)-1]
//
// if fp := fileExists(to_file); fp == nil || !fp.IsDir() {
// fmt.Println("mv: when moving multiple files, last argument must be a directory")
// os.Exit(0)
// } else {
// fmt.Println(files)
// for i := 0; i < len(files); i++ {
// mover(files[i], to_file)
// }
// os.Exit(0)
// }
// }
// }
//
// /* The mover function will take two strings as an argument and move the original file/dir to
// * a new location. */
//
// func mover(originalLocation, newLocation string) {
// fp := fileExists(newLocation)
//
// switch {
// case fileExists(originalLocation) == nil: // If the original file does not exist
// fmt.Printf("mv: cannot stat '%s': No such file or directory\n", originalLocation)
// os.Exit(0)
// case fp != nil && !*forceEnabled: // If the destination file does not exist and forceEnabled is disabled
// if fp.IsDir() {
// base := filepath.Base(originalLocation)
// if fp2 := fileExists(newLocation + "/" + base); fp2 != nil && !*forceEnabled {
// answer := input("File '%s' exists. Overwrite? (y/N): ", newLocation+"/"+base)
// if answer == "y\n" {
// try_move(originalLocation, newLocation+"/"+base)
// } else {
// os.Exit(0)
// }
// } else if fp2 != nil && *forceEnabled {
// try_move(originalLocation, newLocation+"/"+base)
// } else if fp2 == nil {
// try_move(originalLocation, newLocation+"/"+base)
// }
// } else {
// answer := input("File '%s' exists. Overwrite? (y/N): ", newLocation)
// if answer == "y\n" {
// try_move(originalLocation, newLocation)
// } else {
// os.Exit(0)
// }
// }
// default: // If the destination file exists and forceEnabled is enabled,
// try_move(originalLocation, newLocation) // or if the file does not exist, move it.
// }
// }
//
// func try_move(originalLocation, newLocation string) error {
// err := os.Rename(originalLocation, newLocation)
// switch t := err.(type) {
// case *os.LinkError:
// fmt.Printf("Cross-device move. Copying instead\n")
// return move_across_devices(originalLocation, newLocation)
// case *os.PathError:
// fmt.Printf("Path error: %q\n", t)
// return err
// case *os.SyscallError:
// fmt.Printf("Syscall error: %q\n", t)
// return err
// case nil:
// return nil
// default:
// fmt.Printf("Unkown error Type: %T Error: %q", t, t)
// return err
// }
// return nil
// }
//
// func move_across_devices(originalLocation, newLocation string) error {
// src, err := os.Open(originalLocation)
// if err != nil {
// return err
// }
// defer src.Close()
//
// dst, err := os.Create(newLocation)
// if err != nil {
// return err
// }
// defer dst.Close()
//
// size, err := io.Copy(dst, src)
// if err != nil {
// return err
// }
//
// srcStat, err := os.Stat(originalLocation)
// if err != nil {
// return err
// }
// if size != srcStat.Size() {
// os.Remove(newLocation)
// return fmt.Errorf("Error, file was not copied completely")
// }
// os.Remove(originalLocation)
// return nil
// }
//
// func main() {
// help := flag.Bool("help", false, help_text)
// version := flag.Bool("version", false, version_text)
// flag.Parse()
//
// // We only need one instance of forceEnabled
//
// if *forceEnabledLong {
// *forceEnabled = true
// }
//
// // Display help information
//
// if *help {
// fmt.Println(help_text)
// os.Exit(0)
// }
//
// // Display version information
//
// if *version {
// fmt.Println(version_text)
// os.Exit(0)
// }
//
// files := flag.Args() // Obtain a list of files.
// argumentCheck(files) // Check the number of arguments and process them.
// }