util: add stateful cond package
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
a244195d1f
commit
bcbb3d6234
|
@ -5,6 +5,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/moby/buildkit/solver-next/internal/pipe"
|
||||
"github.com/moby/buildkit/util/cond"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
@ -22,7 +23,7 @@ func NewScheduler(ef EdgeFactory) *Scheduler {
|
|||
|
||||
ef: ef,
|
||||
}
|
||||
s.cond = sync.NewCond(&s.mu)
|
||||
s.cond = cond.NewStatefulCond(&s.mu)
|
||||
|
||||
go s.loop()
|
||||
|
||||
|
@ -30,7 +31,7 @@ func NewScheduler(ef EdgeFactory) *Scheduler {
|
|||
}
|
||||
|
||||
type Scheduler struct {
|
||||
cond *sync.Cond
|
||||
cond *cond.StatefulCond
|
||||
mu sync.Mutex
|
||||
muQ sync.Mutex
|
||||
|
||||
|
@ -167,18 +168,7 @@ func (s *Scheduler) signal(e *edge) {
|
|||
s.muQ.Lock()
|
||||
if _, ok := s.waitq[e]; !ok {
|
||||
s.waitq[e] = struct{}{}
|
||||
go func() {
|
||||
s.mu.Lock()
|
||||
s.muQ.Lock()
|
||||
_, ok := s.waitq[e]
|
||||
s.muQ.Unlock()
|
||||
if !ok {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.cond.Signal()
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
s.cond.Signal()
|
||||
}
|
||||
s.muQ.Unlock()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package cond
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// NewStatefulCond returns a stateful version of sync.Cond . This cond will
|
||||
// never block on `Wait()` if `Signal()` has been called after the `Wait()` last
|
||||
// returned. This is useful for avoiding to take a lock on `cond.Locker` for
|
||||
// signalling.
|
||||
func NewStatefulCond(l sync.Locker) *StatefulCond {
|
||||
sc := &StatefulCond{main: l}
|
||||
sc.c = sync.NewCond(&sc.mu)
|
||||
return sc
|
||||
}
|
||||
|
||||
type StatefulCond struct {
|
||||
main sync.Locker
|
||||
mu sync.Mutex
|
||||
c *sync.Cond
|
||||
signalled bool
|
||||
}
|
||||
|
||||
func (s *StatefulCond) Wait() {
|
||||
s.main.Unlock()
|
||||
s.mu.Lock()
|
||||
if !s.signalled {
|
||||
s.c.Wait()
|
||||
}
|
||||
s.signalled = false
|
||||
s.mu.Unlock()
|
||||
s.main.Lock()
|
||||
}
|
||||
|
||||
func (s *StatefulCond) Signal() {
|
||||
s.mu.Lock()
|
||||
s.signalled = true
|
||||
s.c.Signal()
|
||||
s.mu.Unlock()
|
||||
}
|
|
@ -0,0 +1,124 @@
|
|||
package cond
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCondInitialWaitBlocks(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var mu sync.Mutex
|
||||
|
||||
c := NewStatefulCond(&mu)
|
||||
|
||||
waited := make(chan struct{})
|
||||
|
||||
mu.Lock()
|
||||
|
||||
go func() {
|
||||
c.Wait()
|
||||
close(waited)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
case <-waited:
|
||||
require.Fail(t, "wait should have blocked")
|
||||
}
|
||||
|
||||
c.Signal()
|
||||
|
||||
select {
|
||||
case <-time.After(300 * time.Millisecond):
|
||||
require.Fail(t, "wait should have resumed")
|
||||
case <-waited:
|
||||
}
|
||||
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
func TestInitialSignalDoesntBlock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var mu sync.Mutex
|
||||
|
||||
c := NewStatefulCond(&mu)
|
||||
|
||||
waited := make(chan struct{})
|
||||
|
||||
c.Signal()
|
||||
|
||||
mu.Lock()
|
||||
|
||||
go func() {
|
||||
c.Wait()
|
||||
close(waited)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(300 * time.Millisecond):
|
||||
require.Fail(t, "wait should have resumed")
|
||||
case <-waited:
|
||||
}
|
||||
|
||||
waited = make(chan struct{})
|
||||
go func() {
|
||||
c.Wait()
|
||||
close(waited)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
case <-waited:
|
||||
require.Fail(t, "wait should have blocked")
|
||||
}
|
||||
|
||||
c.Signal()
|
||||
|
||||
<-waited
|
||||
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
func TestSignalBetweenWaits(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var mu sync.Mutex
|
||||
|
||||
c := NewStatefulCond(&mu)
|
||||
|
||||
mu.Lock()
|
||||
|
||||
waited := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
c.Wait()
|
||||
close(waited)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
case <-waited:
|
||||
require.Fail(t, "wait should have blocked")
|
||||
}
|
||||
|
||||
c.Signal()
|
||||
|
||||
<-waited
|
||||
|
||||
c.Signal()
|
||||
|
||||
waited = make(chan struct{})
|
||||
go func() {
|
||||
c.Wait()
|
||||
close(waited)
|
||||
}()
|
||||
|
||||
<-waited
|
||||
|
||||
mu.Unlock()
|
||||
}
|
Loading…
Reference in New Issue