From 7e03d62308b6bed86e26391fc3c4c0dae0402628 Mon Sep 17 00:00:00 2001 From: Jaime Pillora Date: Sat, 13 Feb 2016 17:11:17 +1100 Subject: [PATCH] Added many more docs. Added an extra log level: warn, which is on by default. Normal operation are logs, unexcepted events are warnings. Added IDs to slave logs. --- README.md | 105 +++++++++++++----------- example/main.go | 2 +- overseer.go | 78 +++++++++++------- proc_master.go | 153 +++++++++++++++++++---------------- proc_slave.go | 50 ++++++++---- sys_posix.go | 4 +- sys_posix_mv.go | 209 ++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 437 insertions(+), 164 deletions(-) create mode 100644 sys_posix_mv.go diff --git a/README.md b/README.md index 25bbe8e..25890ec 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,9 @@ Monitorable, gracefully restarting, self-upgrading binaries in Go (golang) -The main goal of this project is to facilitate the creation of self-upgrading binaries which play nice with standard process managers. The secondary goal is user simplicity. :warning: This is beta software. +The main goal of this project is to facilitate the creation of self-upgrading binaries which play nice with standard process managers, secondly it should expose a simple API with reasonable defaults for maximum user enjoyment. + +:warning: *This is beta software. Do not use in production. Consider the API unstable. Please report any [issues](https://github.com/jpillora/overseer) as you encounter them.* ### Features @@ -21,6 +23,8 @@ go get github.com/jpillora/overseer ### Quick example +This program works with process managers, supports graceful, zero-down time restarts and self-upgrades its own binary. + ``` go package main @@ -44,7 +48,6 @@ func main() { URL: "http://localhost:4000/binaries/myapp", Interval: 1 * time.Second, }, - // Log: true, //display log of overseer actions }) } @@ -58,33 +61,48 @@ func prog(state overseer.State) { } ``` -```sh -$ cd example/ -$ sh example.sh -serving . on port 4000 -BUILT APP (1) -RUNNING APP -app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) listening... -app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) says hello -app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) says hello -BUILT APP (2) -app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) listening... -app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) says hello -app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) says hello -app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) says hello -app#1 (96015cccdebcec119adad34f49b93e02552f3ad9) exiting... -BUILT APP (3) -app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) listening... -app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) says hello -app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) says hello -app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) says hello -app#2 (ccc073a1c8e94fd4f2d76ebefb2bbc96790cb795) exiting... -app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) says hello -app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) exiting... -``` +**How it works** + +* `overseer` uses the main process to check for and install upgrades and a child process to run `Program`. +* The main process retrieves the files of the listeners described by `Address/es`. +* The child process is provided with these files which is converted into a `Listener/s` for the `Program` to consume. +* All child process pipes are connected back to the main process. +* All signals received on the main process are forwarded through to the child process. +* `Fetcher` runs in a goroutine and checks for updates at preconfigured interval. When `Fetcher` returns a valid binary stream (`io.Reader`), the master process saves it to a temporary location, verifies it, replaces the current binary and initiates a graceful restart. +* The `fetcher.HTTP` accepts a `URL`, it polls this URL with HEAD requests and until it detects a change. On change, we `GET` the `URL` and stream it back out to `overseer`. See also `fetcher.S3`. +* Once a binary is received, it is run with a simple echo token to confirm it is a `overseer` binary. +* Except for scheduled upgrades, the child process exiting will cause the main process to exit with the same code. So, **`overseer` is not a process manager**. + +See [Config](https://godoc.org/github.com/jpillora/overseer#Config)uration options [here](https://godoc.org/github.com/jpillora/overseer#Config) and the runtime [State](https://godoc.org/github.com/jpillora/overseer#State) available to your program [here](https://godoc.org/github.com/jpillora/overseer#State). ### More examples +* See the [example/](example/) directory and run `example.sh`, you should see the following output: + + ```sh + $ cd example/ + $ sh example.sh + serving . on port 5002 + BUILT APP (1) + RUNNING APP + app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) listening... + app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) says hello + app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) says hello + BUILT APP (2) + app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) listening... + app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) says hello + app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) says hello + app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) says hello + app#1 (1cd8b9928d44b0a6e89df40574b8b6d20a417679) exiting... + BUILT APP (3) + app#3 (248f80ea049c835e7e3714b7169c539d3a4d6131) listening... + app#3 (248f80ea049c835e7e3714b7169c539d3a4d6131) says hello + app#3 (248f80ea049c835e7e3714b7169c539d3a4d6131) says hello + app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) says hello + app#2 (b9b251f1be6d0cc423ef921f107cb4fc52f760b3) exiting... + app#3 (248f80ea049c835e7e3714b7169c539d3a4d6131) says hello + ``` + * Only use graceful restarts ```go @@ -96,7 +114,7 @@ app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) exiting... } ``` - Send `main` a `SIGUSR2` to manually trigger a restart + Send `main` a `SIGUSR2` (`Config.RestartSignal`) to manually trigger a restart * Only use auto-upgrades, no restarts @@ -113,31 +131,22 @@ app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) exiting... } ``` - Your binary will be upgraded though it will require manual restart from the user + Your binary will be upgraded though it will require manual restart from the user, suitable for creating self-upgrading command-line applications. -### Warnings +### Known issues +* The master process's `overseer.Config` cannot be changed via an upgrade, the master process must be restarted. * Currently shells out to `mv` for moving files because `mv` handles cross-partition moves unlike `os.Rename`. -* Bind `Addresses` can only be changed by restarting the main process. +* `Addresses` can only be changed by restarting the main process. * Only supported on darwin and linux. -### Documentation +### More documentation * [Core `overseer` package](https://godoc.org/github.com/jpillora/overseer) * [Common `fetcher.Interface`](https://godoc.org/github.com/jpillora/overseer/fetcher#Interface) * [HTTP fetcher type](https://godoc.org/github.com/jpillora/overseer/fetcher#HTTP) * [S3 fetcher type](https://godoc.org/github.com/jpillora/overseer/fetcher#S3) -### Architecture overview - -* `overseer` uses the main process to check for and install upgrades and a child process to run `Program` -* All child process pipes are connected back to the main process -* All signals received on the main process are forwarded through to the child process -* The provided `fetcher.Interface` will be used to `Fetch()` the latest build of the binary -* The `fetcher.HTTP` accepts a `URL`, it polls this URL with HEAD requests and until it detects a change. On change, we `GET` the `URL` and stream it back out to `overseer`. -* Once a binary is received, it is run with a simple echo token to confirm it is a `overseer` binary. -* Except for scheduled upgrades, the child process exiting will cause the main process to exit with the same code. So, **`overseer` is not a process manager**. - ### Docker 1. Compile your `overseer`able `app` to a `/path/on/docker/host/dir/app` @@ -149,16 +158,16 @@ app#3 (286848c2aefcd3f7321a65b5e4efae987fb17911) exiting... 1. For testing, swap out `-d` (daemonize) for `--rm -it` (remove on exit, input, terminal) 1. `app` can use the current working directory as storage -1. `debian` doesn't ship with TLS certs, you can mount them in with `-v /etc/ssl/certs/ca-certificates.crt:/etc/ssl/certs/ca-certificates.crt` - -### Alternatives - -* https://github.com/sanbornm/go-selfupdate -* https://github.com/inconshreveable/go-update +1. If the OS doesn't ship with TLS certs, you can mount them from the host with `-v /etc/ssl/certs/ca-certificates.crt:/etc/ssl/certs/ca-certificates.crt` ### TODO -* Log levels +* Tests! The test suite should drive an: + * HTTP client for application version/uptime testing + * HTTP server for application upgrades + * `overseer` binary process +* HTTP fetcher long-polling +* SCP fetcher (connect to a server, poll path) * Github fetcher (given a repo, poll releases) * etcd fetcher (given a cluster, watch key) * `overseer` CLI tool ([TODO](cmd/overseer/TODO.md)) diff --git a/example/main.go b/example/main.go index b1ed307..8a9c477 100644 --- a/example/main.go +++ b/example/main.go @@ -30,12 +30,12 @@ func prog(state overseer.State) { //'main()' is run in the initial process func main() { overseer.Run(overseer.Config{ - Log: false, //display log of overseer actions Program: prog, Address: ":5001", Fetcher: &fetcher.HTTP{ URL: "http://localhost:5002/myappnew", Interval: 1 * time.Second, }, + Debug: false, //display log of overseer actions }) } diff --git a/overseer.go b/overseer.go index cedccc5..09f6b01 100644 --- a/overseer.go +++ b/overseer.go @@ -2,6 +2,7 @@ package overseer import ( + "errors" "fmt" "log" "os" @@ -12,6 +13,7 @@ import ( ) const ( + envSlaveID = "GO_UPGRADE_SLAVE_ID" envIsSlave = "GO_UPGRADE_IS_SLAVE" envNumFDs = "GO_UPGRADE_NUM_FDS" envBinID = "GO_UPGRADE_BIN_ID" @@ -19,9 +21,9 @@ const ( ) type Config struct { - //Optional allows overseer to fallback to running - //running the program in the main process. - Optional bool + //Required will prevent overseer from fallback to running + //running the program in the main process on failure. + Required bool //Program's main function Program func(state State) //Program's zero-downtime socket listening address (set this or Addresses) @@ -41,31 +43,24 @@ type Config struct { //PreUpgrade runs after a binary has been retreived, user defined checks //can be run here and returning an error will cancel the upgrade. PreUpgrade func(tempBinaryPath string) error - //Log enables [overseer] logs to be sent to stdout. - Log bool + //Debug enables all [overseer] logs. + Debug bool + //NoWarn disables warning [overseer] logs. + NoWarn bool //NoRestartAfterFetch disables automatic restarts after each upgrade. NoRestartAfterFetch bool //Fetcher will be used to fetch binaries. Fetcher fetcher.Interface } -func fatalf(f string, args ...interface{}) { - log.Fatalf("[overseer] "+f, args...) -} - -func Run(c Config) { - //sanity check - if token := os.Getenv(envBinCheck); token != "" { - fmt.Fprint(os.Stdout, token) - os.Exit(0) - } +func validate(c *Config) error { //validate if c.Program == nil { - fatalf("overseer.Config.Program required") + return errors.New("overseer.Config.Program required") } if c.Address != "" { if len(c.Addresses) > 0 { - fatalf("overseer.Config.Address and Addresses cant both be set") + return errors.New("overseer.Config.Address and Addresses cant both be set") } c.Addresses = []string{c.Address} } else if len(c.Addresses) > 0 { @@ -74,28 +69,57 @@ func Run(c Config) { if c.RestartSignal == nil { c.RestartSignal = SIGUSR2 } - if c.TerminateTimeout == 0 { + if c.TerminateTimeout <= 0 { c.TerminateTimeout = 30 * time.Second } - if c.MinFetchInterval == 0 { + if c.MinFetchInterval <= 0 { c.MinFetchInterval = 1 * time.Second } - //os not supported - if !supported { - if !c.Optional { - fatalf("os (%s) not supported", runtime.GOOS) + return nil +} + +//RunErr allows manual handling of any +//overseer errors. +func RunErr(c Config) error { + return runErr(&c) +} + +//Run executes overseer, if an error is +//encounted, overseer fallsback to running +//the program directly (unless Required is set). +func Run(c Config) { + err := runErr(&c) + if err != nil { + if c.Required { + log.Fatalf("[overseer] %s", err) + } else if c.Debug || !c.NoWarn { + log.Printf("[overseer] disabled. run failed: %s", err) } c.Program(DisabledState) return } + os.Exit(0) +} + +func runErr(c *Config) error { + if err := validate(c); err != nil { + return err + } + //sanity check + if token := os.Getenv(envBinCheck); token != "" { + fmt.Fprint(os.Stdout, token) + return nil + } + //os not supported + if !supported { + return fmt.Errorf("os (%s) not supported", runtime.GOOS) + } //run either in master or slave mode if os.Getenv(envIsSlave) == "1" { sp := slave{Config: c} - sp.logf("run") - sp.run() + return sp.run() } else { mp := master{Config: c} - mp.logf("run") - mp.run() + return mp.run() } } diff --git a/proc_master.go b/proc_master.go index 3719ab2..c58432f 100644 --- a/proc_master.go +++ b/proc_master.go @@ -25,7 +25,8 @@ var tmpBinPath = filepath.Join(os.TempDir(), "overseer-"+token()) //a overseer master process type master struct { - Config + *Config + slaveID int slaveCmd *exec.Cmd slaveExtraFiles []*os.File binPath, tmpBinPath string @@ -40,30 +41,26 @@ type master struct { signalledAt time.Time } -func (mp *master) run() { +func (mp *master) run() error { + mp.debugf("run") if err := mp.checkBinary(); err != nil { - if !mp.Config.Optional { - fatalf("%s", err) - } - //run program directly - mp.logf("%s, disabling overseer.", err) - mp.Program(DisabledState) - return + return err } if mp.Config.Fetcher != nil { if err := mp.Config.Fetcher.Init(); err != nil { - mp.logf("fetcher init failed (%s)", err) + mp.warnf("fetcher init failed (%s). fetcher disabled.", err) mp.Config.Fetcher = nil } } mp.setupSignalling() - mp.retreiveFileDescriptors() + if err := mp.retreiveFileDescriptors(); err != nil { + return err + } if mp.Config.Fetcher != nil { - //TODO is required? fatalf("overseer.Config.Fetcher required") mp.fetch() go mp.fetchLoop() } - mp.forkLoop() + return mp.forkLoop() } func (mp *master) checkBinary() error { @@ -119,58 +116,59 @@ func (mp *master) handleSignal(s os.Signal) { //user initiated manual restart mp.triggerRestart() } else if s.String() == "child exited" { - // will occur on every restart + // will occur on every restart, ignore it } else //**during a restart** a SIGUSR1 signals //to the master process that, the file //descriptors have been released if mp.awaitingUSR1 && s == SIGUSR1 { - mp.logf("signaled, sockets ready") + mp.debugf("signaled, sockets ready") mp.awaitingUSR1 = false mp.descriptorsReleased <- true } else //while the slave process is running, proxy //all signals through if mp.slaveCmd != nil && mp.slaveCmd.Process != nil { - mp.logf("proxy signal (%s)", s) + mp.debugf("proxy signal (%s)", s) mp.sendSignal(s) } else //otherwise if not running, kill on CTRL+c if s == os.Interrupt { - mp.logf("interupt with no slave") + mp.debugf("interupt with no slave") os.Exit(1) } else { - mp.logf("signal discarded (%s), no slave process", s) + mp.debugf("signal discarded (%s), no slave process", s) } } func (mp *master) sendSignal(s os.Signal) { if err := mp.slaveCmd.Process.Signal(s); err != nil { - mp.logf("signal failed (%s), assuming slave process died", err) + mp.debugf("signal failed (%s), assuming slave process died unexpectedly", err) os.Exit(1) } } -func (mp *master) retreiveFileDescriptors() { +func (mp *master) retreiveFileDescriptors() error { mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses)) for i, addr := range mp.Config.Addresses { a, err := net.ResolveTCPAddr("tcp", addr) if err != nil { - fatalf("invalid address: %s (%s)", addr, err) + return fmt.Errorf("Invalid address %s (%s)", addr, err) } l, err := net.ListenTCP("tcp", a) if err != nil { - fatalf(err.Error()) + return err } f, err := l.File() if err != nil { - fatalf("failed to retreive fd for: %s (%s)", addr, err) + return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err) } if err := l.Close(); err != nil { - fatalf("failed to close listener for: %s (%s)", addr, err) + return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err) } mp.slaveExtraFiles[i] = f } + return nil } //fetchLoop is run in a goroutine @@ -180,6 +178,7 @@ func (mp *master) fetchLoop() { for { t0 := time.Now() mp.fetch() + //duration fetch of fetch diff := time.Now().Sub(t0) if diff < min { delay := min - diff @@ -194,67 +193,66 @@ func (mp *master) fetch() { if mp.restarting { return //skip if restarting } - - mp.logf("checking for updates...") + mp.debugf("checking for updates...") reader, err := mp.Fetcher.Fetch() if err != nil { - mp.logf("failed to get latest version: %s", err) + mp.debugf("failed to get latest version: %s", err) return } if reader == nil { - mp.logf("no updates") + mp.debugf("no updates") return //fetcher has explicitly said there are no updates } - mp.logf("streaming update...") + mp.debugf("streaming update...") //optional closer if closer, ok := reader.(io.Closer); ok { defer closer.Close() } tmpBin, err := os.OpenFile(tmpBinPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { - mp.logf("failed to open temp binary: %s", err) + mp.warnf("failed to open temp binary: %s", err) return } - // defer func() { - // tmpBin.Close() - // os.Remove(tmpBinPath) - // }() + defer func() { + tmpBin.Close() + os.Remove(tmpBinPath) + }() //tee off to sha1 hash := sha1.New() reader = io.TeeReader(reader, hash) //write to a temp file _, err = io.Copy(tmpBin, reader) if err != nil { - mp.logf("failed to write temp binary: %s", err) + mp.warnf("failed to write temp binary: %s", err) return } //compare hash newHash := hash.Sum(nil) if bytes.Equal(mp.binHash, newHash) { - mp.logf("hash match - skip") + mp.debugf("hash match - skip") return } //copy permissions if err := tmpBin.Chmod(mp.binPerms); err != nil { - mp.logf("failed to make temp binary executable: %s", err) + mp.warnf("failed to make temp binary executable: %s", err) return } if err := tmpBin.Chown(uid, gid); err != nil { - mp.logf("failed to change owner of binary: %s", err) + mp.warnf("failed to change owner of binary: %s", err) return } if _, err := tmpBin.Stat(); err != nil { - mp.logf("failed to stat temp binary: %s", err) + mp.warnf("failed to stat temp binary: %s", err) return } tmpBin.Close() if _, err := os.Stat(tmpBinPath); err != nil { - mp.logf("failed to stat temp binary by path: %s", err) + mp.warnf("failed to stat temp binary by path: %s", err) return } if mp.Config.PreUpgrade != nil { if err := mp.Config.PreUpgrade(tmpBinPath); err != nil { - mp.logf("user cancelled upgrade: %s", err) + mp.warnf("user cancelled upgrade: %s", err) return } } @@ -264,19 +262,19 @@ func (mp *master) fetch() { cmd.Env = []string{envBinCheck + "=" + tokenIn} tokenOut, err := cmd.Output() if err != nil { - mp.logf("failed to run temp binary: %s", err) + mp.warnf("failed to run temp binary: %s", err) return } if tokenIn != string(tokenOut) { - mp.logf("sanity check failed") + mp.warnf("sanity check failed") return } //overwrite! if err := move(mp.binPath, tmpBinPath); err != nil { - mp.logf("failed to overwrite binary: %s", err) + mp.warnf("failed to overwrite binary: %s", err) return } - mp.logf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12]) + mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12]) mp.binHash = newHash //binary successfully replaced if !mp.Config.NoRestartAfterFetch { @@ -288,13 +286,13 @@ func (mp *master) fetch() { func (mp *master) triggerRestart() { if mp.restarting { - mp.logf("already graceful restarting") + mp.debugf("already graceful restarting") return //skip } else if mp.slaveCmd == nil || mp.restarting { - mp.logf("no slave process") + mp.debugf("no slave process") return //skip } - mp.logf("graceful restart triggered") + mp.debugf("graceful restart triggered") mp.restarting = true mp.awaitingUSR1 = true mp.signalledAt = time.Now() @@ -302,31 +300,37 @@ func (mp *master) triggerRestart() { select { case <-mp.restarted: //success - mp.logf("restart success") + mp.debugf("restart success") case <-time.After(mp.TerminateTimeout): - //times up process, we did ask nicely! - mp.logf("graceful timeout, forcing exit") + //times up mr. process, we did ask nicely! + mp.debugf("graceful timeout, forcing exit") mp.sendSignal(os.Kill) } } //not a real fork -func (mp *master) forkLoop() { +func (mp *master) forkLoop() error { //loop, restart command for { - mp.fork() + if err := mp.fork(); err != nil { + return err + } } } -func (mp *master) fork() { - mp.logf("starting %s", mp.binPath) +func (mp *master) fork() error { + mp.debugf("starting %s", mp.binPath) cmd := exec.Command(mp.binPath) + //mark this new process as the "active" slave process. + //this process is assumed to be holding the socket files. mp.slaveCmd = cmd - + mp.slaveID++ + //provide the slave process with some state e := os.Environ() e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash)) + e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID)) e = append(e, envIsSlave+"=1") - e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.Config.Addresses))) + e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles))) cmd.Env = e //inherit master args/stdfiles cmd.Args = os.Args @@ -336,8 +340,9 @@ func (mp *master) fork() { //include socket files cmd.ExtraFiles = mp.slaveExtraFiles if err := cmd.Start(); err != nil { - fatalf("failed to fork: %s", err) + return fmt.Errorf("Failed to start slave process: %s", err) } + //was scheduled to restart, notify success if mp.restarting { mp.restartedAt = time.Now() mp.restarting = false @@ -352,10 +357,6 @@ func (mp *master) fork() { select { case err := <-cmdwait: //program exited before releasing descriptors - if mp.restarting { - //restart requested - return - } //proxy exit code out to master code := 0 if err != nil { @@ -366,20 +367,32 @@ func (mp *master) fork() { } } } - mp.logf("prog exited with %d", code) - //proxy exit with same code - os.Exit(code) + mp.debugf("prog exited with %d", code) + //if a restart wasn't requested, proxy + //through the exit code via the main process + if !mp.restarting { + os.Exit(code) + } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and - //a new instance should be started to pick - //them up. The previous cmd.Wait() will still - //be consumed though it will be discarded. + //a parallel instance of the program can be + //started safely. it should serve state.Listeners + //to ensure downtime is kept at <1sec. The previous + //cmd.Wait() will still be consumed though the + //result will be discarded. + } + return nil +} + +func (mp *master) debugf(f string, args ...interface{}) { + if mp.Config.Debug { + log.Printf("[overseer master] "+f, args...) } } -func (mp *master) logf(f string, args ...interface{}) { - if mp.Log { +func (mp *master) warnf(f string, args ...interface{}) { + if mp.Config.Debug || !mp.Config.NoWarn { log.Printf("[overseer master] "+f, args...) } } diff --git a/proc_slave.go b/proc_slave.go index e8f4ae6..74afce0 100644 --- a/proc_slave.go +++ b/proc_slave.go @@ -1,6 +1,7 @@ package overseer import ( + "fmt" "log" "net" "os" @@ -44,51 +45,61 @@ type State struct { //a overseer slave process type slave struct { - Config + *Config + id string listeners []*upListener masterPid int masterProc *os.Process state State } -func (sp *slave) run() { +func (sp *slave) run() error { + sp.id = os.Getenv(envSlaveID) + sp.debugf("run") sp.state.Enabled = true sp.state.ID = os.Getenv(envBinID) sp.state.StartedAt = time.Now() sp.state.Address = sp.Config.Address sp.state.Addresses = sp.Config.Addresses sp.state.GracefulShutdown = make(chan bool, 1) - sp.watchParent() - sp.initFileDescriptors() + if err := sp.watchParent(); err != nil { + return err + } + if err := sp.initFileDescriptors(); err != nil { + return err + } sp.watchSignal() //run program with state - sp.logf("start program") + sp.debugf("start program") sp.Config.Program(sp.state) + return nil } -func (sp *slave) watchParent() { +func (sp *slave) watchParent() error { sp.masterPid = os.Getppid() proc, err := os.FindProcess(sp.masterPid) if err != nil { - fatalf("parent process %s", err) + return fmt.Errorf("master process: %s", err) } sp.masterProc = proc go func() { + //send signal 0 to master process forever for { - //sending signal 0 should not error as long as the process is alive + //should not error as long as the process is alive if err := sp.masterProc.Signal(syscall.Signal(0)); err != nil { os.Exit(1) } time.Sleep(2 * time.Second) } }() + return nil } -func (sp *slave) initFileDescriptors() { +func (sp *slave) initFileDescriptors() error { //inspect file descriptors numFDs, err := strconv.Atoi(os.Getenv(envNumFDs)) if err != nil { - fatalf("invalid %s integer", envNumFDs) + return fmt.Errorf("invalid %s integer", envNumFDs) } sp.listeners = make([]*upListener, numFDs) sp.state.Listeners = make([]net.Listener, numFDs) @@ -96,7 +107,7 @@ func (sp *slave) initFileDescriptors() { f := os.NewFile(uintptr(3+i), "") l, err := net.FileListener(f) if err != nil { - fatalf("failed to inherit file descriptor: %d", i) + return fmt.Errorf("failed to inherit file descriptor: %d", i) } u := newUpListener(l) sp.listeners[i] = u @@ -105,6 +116,7 @@ func (sp *slave) initFileDescriptors() { if len(sp.state.Listeners) > 0 { sp.state.Listener = sp.state.Listeners[0] } + return nil } func (sp *slave) watchSignal() { @@ -113,7 +125,7 @@ func (sp *slave) watchSignal() { go func() { <-signals signal.Stop(signals) - sp.logf("graceful shutdown requested") + sp.debugf("graceful shutdown requested") //master wants to restart, sp.state.GracefulShutdown <- true //release any sockets and notify master @@ -129,14 +141,20 @@ func (sp *slave) watchSignal() { //start death-timer go func() { time.Sleep(sp.Config.TerminateTimeout) - sp.logf("timeout. forceful shutdown") + sp.debugf("timeout. forceful shutdown") os.Exit(1) }() }() } -func (sp *slave) logf(f string, args ...interface{}) { - if sp.Log { - log.Printf("[overseer slave] "+f, args...) +func (sp *slave) debugf(f string, args ...interface{}) { + if sp.Config.Debug { + log.Printf("[overseer slave#"+sp.id+"] "+f, args...) + } +} + +func (sp *slave) warnf(f string, args ...interface{}) { + if sp.Config.Debug || !sp.Config.NoWarn { + log.Printf("[overseer slave#"+sp.id+"] "+f, args...) } } diff --git a/sys_posix.go b/sys_posix.go index 54c3f70..2507574 100644 --- a/sys_posix.go +++ b/sys_posix.go @@ -22,7 +22,7 @@ var ( func move(dst, src string) error { //HACK: we're shelling out to mv because linux - //throws errors when we use Rename/Create a - //running binary. + //throws errors when crossing device boundaryes. + //TODO see sys_posix_mv.go return exec.Command("mv", src, dst).Run() } diff --git a/sys_posix_mv.go b/sys_posix_mv.go new file mode 100644 index 0000000..c9c3b96 --- /dev/null +++ b/sys_posix_mv.go @@ -0,0 +1,209 @@ +package overseer + +// TODO(@jpillora) borrowed from https://github.com/aisola/go-coreutils/blob/master/mv/mv.go +// +// mv.go (go-coreutils) 0.1 +// Copyright (C) 2014, The GO-Coreutils Developers. +// +// Written By: Abram C. Isola, Michael Murphy +// +// package main +// +// import "bufio" +// import "flag" +// import "fmt" +// import "io" +// import "os" +// import "path/filepath" +// +// const ( +// help_text string = ` +// Usage: mv [OPTION]... [PATH]... [PATH] +// or: mv [PATH] [PATH] +// or: mv [OPTION] +// move or rename files or directories +// --help display this help and exit +// --version output version information and exit +// -f, --force remove existing destination files and never prompt the user +// ` // -v, --verbose print the name of each file before moving it +// version_text = ` +// mv (go-coreutils) 0.1 +// Copyright (C) 2014, The GO-Coreutils Developers. +// This program comes with ABSOLUTELY NO WARRANTY; for details see +// LICENSE. This is free software, and you are welcome to redistribute +// it under certain conditions in LICENSE. +// ` +// ) +// +// var ( +// forceEnabled = flag.Bool("f", false, "remove existing destination files and never prompt the user") +// forceEnabledLong = flag.Bool("force", false, "remove existing destination files and never prompt the user") +// ) +// +// // The input function prints a statement to the user and accepts an input, then returns the input. +// +// func input(prompt, location string) string { +// fmt.Printf(prompt, location) +// +// reader := bufio.NewReader(os.Stdin) +// userinput, _ := reader.ReadString([]byte("\n")[0]) +// +// return userinput +// } +// +// // The fileExists function will check if the file exists. +// +// func fileExists(filep string) os.FileInfo { +// fp, err := os.Stat(filep) +// if err != nil && os.IsNotExist(err) { +// return nil +// } +// return fp +// } +// +// /* The argumentCheck function will check the number of arguments given to the program and process them +// * accordingly. */ +// +// func argumentCheck(files []string) { +// switch len(files) { +// case 0: // If there is no argument +// fmt.Println("mv: missing file operand\nTry 'mv -help' for more information") +// os.Exit(0) +// case 1: // If there is one argument +// fmt.Printf("mv: missing destination file operand after '%s'\nTry 'mv -help' for more information.\n", files[0]) +// os.Exit(0) +// case 2: // If there are two arguments +// mover(files[0], files[1]) +// default: // If there are more than two arguments +// to_file, files := files[len(files)-1], files[:len(files)-1] +// +// if fp := fileExists(to_file); fp == nil || !fp.IsDir() { +// fmt.Println("mv: when moving multiple files, last argument must be a directory") +// os.Exit(0) +// } else { +// fmt.Println(files) +// for i := 0; i < len(files); i++ { +// mover(files[i], to_file) +// } +// os.Exit(0) +// } +// } +// } +// +// /* The mover function will take two strings as an argument and move the original file/dir to +// * a new location. */ +// +// func mover(originalLocation, newLocation string) { +// fp := fileExists(newLocation) +// +// switch { +// case fileExists(originalLocation) == nil: // If the original file does not exist +// fmt.Printf("mv: cannot stat '%s': No such file or directory\n", originalLocation) +// os.Exit(0) +// case fp != nil && !*forceEnabled: // If the destination file does not exist and forceEnabled is disabled +// if fp.IsDir() { +// base := filepath.Base(originalLocation) +// if fp2 := fileExists(newLocation + "/" + base); fp2 != nil && !*forceEnabled { +// answer := input("File '%s' exists. Overwrite? (y/N): ", newLocation+"/"+base) +// if answer == "y\n" { +// try_move(originalLocation, newLocation+"/"+base) +// } else { +// os.Exit(0) +// } +// } else if fp2 != nil && *forceEnabled { +// try_move(originalLocation, newLocation+"/"+base) +// } else if fp2 == nil { +// try_move(originalLocation, newLocation+"/"+base) +// } +// } else { +// answer := input("File '%s' exists. Overwrite? (y/N): ", newLocation) +// if answer == "y\n" { +// try_move(originalLocation, newLocation) +// } else { +// os.Exit(0) +// } +// } +// default: // If the destination file exists and forceEnabled is enabled, +// try_move(originalLocation, newLocation) // or if the file does not exist, move it. +// } +// } +// +// func try_move(originalLocation, newLocation string) error { +// err := os.Rename(originalLocation, newLocation) +// switch t := err.(type) { +// case *os.LinkError: +// fmt.Printf("Cross-device move. Copying instead\n") +// return move_across_devices(originalLocation, newLocation) +// case *os.PathError: +// fmt.Printf("Path error: %q\n", t) +// return err +// case *os.SyscallError: +// fmt.Printf("Syscall error: %q\n", t) +// return err +// case nil: +// return nil +// default: +// fmt.Printf("Unkown error Type: %T Error: %q", t, t) +// return err +// } +// return nil +// } +// +// func move_across_devices(originalLocation, newLocation string) error { +// src, err := os.Open(originalLocation) +// if err != nil { +// return err +// } +// defer src.Close() +// +// dst, err := os.Create(newLocation) +// if err != nil { +// return err +// } +// defer dst.Close() +// +// size, err := io.Copy(dst, src) +// if err != nil { +// return err +// } +// +// srcStat, err := os.Stat(originalLocation) +// if err != nil { +// return err +// } +// if size != srcStat.Size() { +// os.Remove(newLocation) +// return fmt.Errorf("Error, file was not copied completely") +// } +// os.Remove(originalLocation) +// return nil +// } +// +// func main() { +// help := flag.Bool("help", false, help_text) +// version := flag.Bool("version", false, version_text) +// flag.Parse() +// +// // We only need one instance of forceEnabled +// +// if *forceEnabledLong { +// *forceEnabled = true +// } +// +// // Display help information +// +// if *help { +// fmt.Println(help_text) +// os.Exit(0) +// } +// +// // Display version information +// +// if *version { +// fmt.Println(version_text) +// os.Exit(0) +// } +// +// files := flag.Args() // Obtain a list of files. +// argumentCheck(files) // Check the number of arguments and process them. +// }