somewhat working, though still buggy

master
Jaime Pillora 2016-02-08 03:05:31 +11:00
parent 9ef850b1bc
commit 8f42cc8800
5 changed files with 110 additions and 37 deletions

View File

@ -10,19 +10,17 @@ import (
"github.com/jpillora/go-upgrade/fetcher"
)
var FOO = "" //set manually or with with ldflags
var VAR = "" //set manually or with with ldflags
//convert your 'main()' into a 'prog(state)'
func prog(state upgrade.State) {
log.Printf("app (%s) listening...", state.ID)
http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Foo", FOO)
w.Header().Set("Header-Time", time.Now().String())
w.WriteHeader(200)
time.Sleep(30 * time.Second)
fmt.Fprintf(w, "Body-Time: %s (Foo: %s)", time.Now(), FOO)
time.Sleep(10 * time.Second)
fmt.Fprintf(w, "Var is: %s", VAR)
}))
http.Serve(state.Listener, nil)
err := http.Serve(state.Listener, nil)
log.Printf("app (%s) exiting: %v", state.ID, err)
}
//then create another 'main' which runs the upgrades
@ -32,7 +30,7 @@ func main() {
Address: "0.0.0.0:3000",
Fetcher: &fetcher.HTTP{
URL: "http://localhost:4000/myapp2",
Interval: 5 * time.Second,
Interval: 1 * time.Second,
},
Logging: true, //display log of go-upgrade actions
})

View File

@ -7,10 +7,19 @@ import (
"time"
)
func newUpListener(l net.Listener) *upListener {
return &upListener{
Listener: l,
closeByForce: make(chan bool),
}
}
//gracefully closing net.Listener
type upListener struct {
net.Listener
wg sync.WaitGroup
closeError error
closeByForce chan bool
wg sync.WaitGroup
}
func (l *upListener) Accept() (net.Conn, error) {
@ -21,13 +30,47 @@ func (l *upListener) Accept() (net.Conn, error) {
conn.SetKeepAlive(true) // see http.tcpKeepAliveListener
conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
uconn := upConn{
Conn: conn,
wg: &l.wg,
Conn: conn,
wg: &l.wg,
closed: make(chan bool),
}
go func() {
//connection watcher
select {
case <-l.closeByForce:
uconn.Close()
case <-uconn.closed:
//closed manually
}
}()
l.wg.Add(1)
return uconn, nil
}
func (l *upListener) release(timeout time.Duration) {
//stop accepting connections - release fd
l.closeError = l.Listener.Close()
//start timer, close by force if deadline not met
waited := make(chan bool)
go func() {
l.wg.Wait()
waited <- true
}()
go func() {
select {
case <-time.After(timeout):
close(l.closeByForce)
case <-waited:
//no need to force close
}
}()
}
func (l *upListener) Close() error {
l.wg.Wait()
return l.closeError
}
func (l *upListener) File() *os.File {
// returns a dup(2) - FD_CLOEXEC flag *not* set
tl := l.Listener.(*net.TCPListener)
@ -37,13 +80,15 @@ func (l *upListener) File() *os.File {
type upConn struct {
net.Conn
wg *sync.WaitGroup
wg *sync.WaitGroup
closed chan bool
}
func (uconn upConn) Close() error {
err := uconn.Conn.Close()
if err == nil {
uconn.wg.Done()
uconn.closed <- true
}
return err
}

View File

@ -1,9 +0,0 @@
package upgrade
import "testing"
func TestGraceful(t *testing.T) {
}

View File

@ -42,6 +42,7 @@ func (mp *master) run() {
mp.readBinary()
mp.setupSignalling()
mp.retreiveFileDescriptors()
mp.fetch()
go mp.fetchLoop()
mp.forkLoop()
}
@ -85,7 +86,7 @@ func (mp *master) readBinary() {
}
if err != nil {
if mp.Config.Optional {
log.Print(err)
mp.logf("%s, disabling go-upgrade. ", err)
} else {
fatalf("%s", err)
}
@ -119,7 +120,10 @@ func (mp *master) setupSignalling() {
if mp.slaveCmd != nil && mp.slaveCmd.Process != nil {
mp.logf("proxy signal (%s)", s)
mp.slaveCmd.Process.Signal(s)
if err := mp.slaveCmd.Process.Signal(s); err != nil {
mp.logf("proxy signal failed (%s)", err)
os.Exit(1)
}
} else if s == syscall.SIGINT {
mp.logf("interupt with no slave")
os.Exit(1)
@ -153,9 +157,18 @@ func (mp *master) retreiveFileDescriptors() {
}
func (mp *master) fetchLoop() {
minDelay := time.Second
time.Sleep(minDelay)
for {
t0 := time.Now()
mp.fetch()
time.Sleep(time.Second) //fetches should be throttled by the fetcher!
diff := time.Now().Sub(t0)
if diff < minDelay {
delay := minDelay - diff
//ensures at least minDelay
//should be throttled by the fetcher!
time.Sleep(delay)
}
}
}
@ -224,7 +237,7 @@ func (mp *master) fetch() {
//binary successfully replaced, perform graceful restart
mp.restarting = true
mp.signalledAt = time.Now()
mp.signals <- syscall.SIGTERM //ask nicely to terminate
mp.signals <- mp.Config.Signal //ask nicely to terminate
select {
case <-mp.restarted:
//success
@ -296,10 +309,10 @@ func (mp *master) fork() {
//proxy exit with same code
os.Exit(code)
case <-mp.descriptorsReleased:
log.Printf("descriptors released")
//if descriptors are released, the program
//is yielding control of the socket and
//should restart
//has yielded control of its sockets and
//a new instance should be started to pick
//them up
}
}

View File

@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"strconv"
"syscall"
"time"
)
@ -38,6 +39,8 @@ type State struct {
type slave struct {
Config
listeners []*upListener
ppid int
pproc *os.Process
state State
}
@ -45,14 +48,32 @@ func (sp *slave) run() {
sp.state.Enabled = true
sp.state.ID = os.Getenv(envBinID)
sp.state.StartedAt = time.Now()
sp.watchParent()
sp.initFileDescriptors()
//find parent
sp.watchSignal()
//run program with state
sp.logf("start program")
sp.Config.Program(sp.state)
}
func (sp *slave) watchParent() {
sp.ppid = os.Getppid()
proc, err := os.FindProcess(sp.ppid)
if err != nil {
fatalf("parent process %s", err)
}
sp.pproc = proc
go func() {
for {
//sending signal 0 should not error as long as the process is alive
if err := sp.pproc.Signal(syscall.Signal(0)); err != nil {
os.Exit(1)
}
time.Sleep(2 * time.Second)
}
}()
}
func (sp *slave) initFileDescriptors() {
//inspect file descriptors
numFDs, err := strconv.Atoi(os.Getenv(envNumFDs))
@ -67,7 +88,7 @@ func (sp *slave) initFileDescriptors() {
if err != nil {
fatalf("failed to inherit file descriptor: %d", i)
}
u := &upListener{Listener: l}
u := newUpListener(l)
sp.listeners[i] = u
sp.state.Listeners[i] = u
}
@ -81,13 +102,18 @@ func (sp *slave) watchSignal() {
signal.Notify(signals, sp.Config.Signal)
go func() {
<-signals
//do graceful shutdown
//stop listening
signal.Stop(signals)
sp.logf("graceful shutdown requested")
//master wants to restart,
//perform graceful shutdown:
for _, l := range sp.listeners {
l.release(sp.Config.TerminateTimeout)
}
sp.logf("released")
//signal released fds
//listeners should be waiting on connections and close
sp.pproc.Signal(syscall.SIGUSR1)
sp.logf("notify USR1")
//listeners should be waiting on connections to close...
}()
}