solver: fix pipe signaling on incoming updates

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
v0.7
Tonis Tiigi 2019-09-10 16:18:38 -07:00
parent f6fe51ed9a
commit eede0facb7
2 changed files with 12 additions and 5 deletions

View File

@ -177,6 +177,9 @@ func (e *edge) finishIncoming(req pipe.Sender) {
// updateIncoming updates the current value of incoming pipe request
func (e *edge) updateIncoming(req pipe.Sender) {
if debugScheduler {
logrus.Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
}
req.Update(&e.edgeState)
}

View File

@ -11,11 +11,15 @@ import (
type channel struct {
OnSendCompletion func()
value atomic.Value
lastValue interface{}
lastValue *wrappedValue
}
type wrappedValue struct {
value interface{}
}
func (c *channel) Send(v interface{}) {
c.value.Store(v)
c.value.Store(&wrappedValue{value: v})
if c.OnSendCompletion != nil {
c.OnSendCompletion()
}
@ -23,11 +27,11 @@ func (c *channel) Send(v interface{}) {
func (c *channel) Receive() (interface{}, bool) {
v := c.value.Load()
if c.lastValue == v {
if v == nil || v.(*wrappedValue) == c.lastValue {
return nil, false
}
c.lastValue = v
return v, true
c.lastValue = v.(*wrappedValue)
return v.(*wrappedValue).value, true
}
type Pipe struct {