From eede0facb70f860854e70e2ff437c8c2d0509ccc Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 10 Sep 2019 16:18:38 -0700 Subject: [PATCH] solver: fix pipe signaling on incoming updates Signed-off-by: Tonis Tiigi --- solver/edge.go | 3 +++ solver/internal/pipe/pipe.go | 14 +++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/solver/edge.go b/solver/edge.go index b809652c..0a076a21 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -177,6 +177,9 @@ func (e *edge) finishIncoming(req pipe.Sender) { // updateIncoming updates the current value of incoming pipe request func (e *edge) updateIncoming(req pipe.Sender) { + if debugScheduler { + logrus.Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState) + } req.Update(&e.edgeState) } diff --git a/solver/internal/pipe/pipe.go b/solver/internal/pipe/pipe.go index a302e3a1..81702f52 100644 --- a/solver/internal/pipe/pipe.go +++ b/solver/internal/pipe/pipe.go @@ -11,11 +11,15 @@ import ( type channel struct { OnSendCompletion func() value atomic.Value - lastValue interface{} + lastValue *wrappedValue +} + +type wrappedValue struct { + value interface{} } func (c *channel) Send(v interface{}) { - c.value.Store(v) + c.value.Store(&wrappedValue{value: v}) if c.OnSendCompletion != nil { c.OnSendCompletion() } @@ -23,11 +27,11 @@ func (c *channel) Send(v interface{}) { func (c *channel) Receive() (interface{}, bool) { v := c.value.Load() - if c.lastValue == v { + if v == nil || v.(*wrappedValue) == c.lastValue { return nil, false } - c.lastValue = v - return v, true + c.lastValue = v.(*wrappedValue) + return v.(*wrappedValue).value, true } type Pipe struct {