solver: refactor to a shared vertex solver model

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2017-08-09 18:20:33 -07:00
parent 69b7b778b6
commit 01aaf130e5
21 changed files with 1428 additions and 670 deletions

View File

@ -602,7 +602,7 @@ func ensureOriginMetadata(md *metadata.StorageItem) *metadata.StorageItem {
}
si, ok := md.Storage().Get(mutable)
if ok {
return &si
return si
}
return md
}

View File

@ -1,7 +1,7 @@
package instructioncache
import (
"strings"
"bytes"
"github.com/boltdb/bolt"
"github.com/moby/buildkit/cache"
@ -15,6 +15,8 @@ import (
const cacheKey = "buildkit.instructioncache"
const contentCacheKey = "buildkit.instructioncache.content"
const mappingBucket = "_contentMapping"
type LocalStore struct {
MetadataStore *metadata.Store
Cache cache.Accessor
@ -35,6 +37,9 @@ func (ls *LocalStore) Set(key digest.Digest, value interface{}) error {
return si.SetValue(b, v.Index, v)
})
}
func (ls *LocalStore) Probe(ctx context.Context, key digest.Digest) (bool, error) {
return ls.MetadataStore.Probe(index(key.String()))
}
func (ls *LocalStore) Lookup(ctx context.Context, key digest.Digest) (interface{}, error) {
snaps, err := ls.MetadataStore.Search(index(key.String()))
@ -60,42 +65,44 @@ func (ls *LocalStore) Lookup(ctx context.Context, key digest.Digest) (interface{
return nil, nil
}
func (ls *LocalStore) SetContentMapping(key digest.Digest, value interface{}) error {
ref, ok := value.(cache.ImmutableRef)
if !ok {
return errors.Errorf("invalid ref")
}
v, err := metadata.NewValue(ref.ID())
if err != nil {
return err
}
v.Index = contentIndex(key.String())
si, _ := ls.MetadataStore.Get(ref.ID())
return si.Update(func(b *bolt.Bucket) error {
return si.SetValue(b, v.Index, v)
func (ls *LocalStore) SetContentMapping(contentKey, regularKey digest.Digest) error {
db := ls.MetadataStore.DB()
return db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(mappingBucket))
if err != nil {
return err
}
return b.Put([]byte(contentKey.String()+"::"+regularKey.String()), []byte{})
})
}
func (ls *LocalStore) GetContentMapping(key digest.Digest) ([]digest.Digest, error) {
snaps, err := ls.MetadataStore.Search(contentIndex(key.String()))
if err != nil {
return nil, err
}
var out []digest.Digest
for _, s := range snaps {
for _, k := range s.Keys() {
if strings.HasPrefix(k, index("")) {
out = append(out, digest.Digest(strings.TrimPrefix(k, index("")))) // TODO: type
var dgsts []digest.Digest
db := ls.MetadataStore.DB()
if err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(mappingBucket))
if b == nil {
return nil
}
c := b.Cursor()
index := []byte(key.String() + "::")
k, _ := c.Seek(index)
for {
if k != nil && bytes.HasPrefix(k, index) {
dgsts = append(dgsts, digest.Digest(string(bytes.TrimPrefix(k, index))))
k, _ = c.Next()
} else {
break
}
}
return nil
}); err != nil {
return nil, err
}
return out, nil
return dgsts, nil
}
func index(k string) string {
return cacheKey + "::" + k
}
func contentIndex(k string) string {
return contentCacheKey + "::" + k
}

12
cache/manager.go vendored
View File

@ -122,7 +122,7 @@ func (cm *cacheManager) load(ctx context.Context, id string, opts ...RefOption)
}
md, _ := cm.md.Get(id)
if mutableID := getEqualMutable(&md); mutableID != "" {
if mutableID := getEqualMutable(md); mutableID != "" {
mutable, err := cm.load(ctx, mutableID)
if err != nil {
return nil, err
@ -131,7 +131,7 @@ func (cm *cacheManager) load(ctx context.Context, id string, opts ...RefOption)
cm: cm,
refs: make(map[Mountable]struct{}),
parent: mutable.parent,
md: &md,
md: md,
equalMutable: &mutableRef{cacheRecord: mutable},
}
mutable.equalImmutable = &immutableRef{cacheRecord: rec}
@ -157,7 +157,7 @@ func (cm *cacheManager) load(ctx context.Context, id string, opts ...RefOption)
cm: cm,
refs: make(map[Mountable]struct{}),
parent: parent,
md: &md,
md: md,
}
if err := initializeMetadata(rec, opts...); err != nil {
@ -202,7 +202,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti
cm: cm,
refs: make(map[Mountable]struct{}),
parent: parent,
md: &md,
md: md,
}
if err := initializeMetadata(rec, opts...); err != nil {
@ -379,6 +379,10 @@ type withMetadata interface {
Metadata() *metadata.StorageItem
}
func HasCachePolicyRetain(m withMetadata) bool {
return getCachePolicy(m.Metadata()) == cachePolicyRetain
}
func CachePolicyRetain(m withMetadata) error {
return queueCachePolicy(m.Metadata(), cachePolicyRetain)
}

View File

@ -1,8 +1,10 @@
package metadata
import (
"bytes"
"encoding/json"
"strings"
"sync"
"github.com/boltdb/bolt"
"github.com/pkg/errors"
@ -29,8 +31,12 @@ func NewStore(dbPath string) (*Store, error) {
return &Store{db: db}, nil
}
func (s *Store) All() ([]StorageItem, error) {
var out []StorageItem
func (s *Store) DB() *bolt.DB {
return s.db
}
func (s *Store) All() ([]*StorageItem, error) {
var out []*StorageItem
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(mainBucket))
if b == nil {
@ -52,8 +58,30 @@ func (s *Store) All() ([]StorageItem, error) {
return out, err
}
func (s *Store) Search(index string) ([]StorageItem, error) {
var out []StorageItem
func (s *Store) Probe(index string) (bool, error) {
var exists bool
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(indexBucket))
if b == nil {
return nil
}
main := tx.Bucket([]byte(mainBucket))
if main == nil {
return nil
}
search := []byte(indexKey(index, ""))
c := b.Cursor()
k, _ := c.Seek(search)
if k != nil && bytes.HasPrefix(k, search) {
exists = true
}
return nil
})
return exists, err
}
func (s *Store) Search(index string) ([]*StorageItem, error) {
var out []*StorageItem
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(indexBucket))
if b == nil {
@ -149,8 +177,8 @@ func (s *Store) Update(id string, fn func(b *bolt.Bucket) error) error {
})
}
func (s *Store) Get(id string) (StorageItem, bool) {
empty := func() StorageItem {
func (s *Store) Get(id string) (*StorageItem, bool) {
empty := func() *StorageItem {
si, _ := newStorageItem(id, nil, s)
return si
}
@ -180,10 +208,11 @@ type StorageItem struct {
values map[string]*Value
queue []func(*bolt.Bucket) error
storage *Store
mu sync.RWMutex
}
func newStorageItem(id string, b *bolt.Bucket, s *Store) (StorageItem, error) {
si := StorageItem{
func newStorageItem(id string, b *bolt.Bucket, s *Store) (*StorageItem, error) {
si := &StorageItem{
id: id,
storage: s,
values: make(map[string]*Value),
@ -230,7 +259,10 @@ func (s *StorageItem) Keys() []string {
}
func (s *StorageItem) Get(k string) *Value {
return s.values[k]
s.mu.RLock()
v := s.values[k]
s.mu.RUnlock()
return v
}
func (s *StorageItem) GetExternal(k string) ([]byte, error) {
@ -271,10 +303,14 @@ func (s *StorageItem) SetExternal(k string, dt []byte) error {
}
func (s *StorageItem) Queue(fn func(b *bolt.Bucket) error) {
s.mu.Lock()
defer s.mu.Unlock()
s.queue = append(s.queue, fn)
}
func (s *StorageItem) Commit() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.Update(func(b *bolt.Bucket) error {
for _, fn := range s.queue {
if err := fn(b); err != nil {

8
cache/refs.go vendored
View File

@ -240,11 +240,11 @@ func (sr *mutableRef) commit(ctx context.Context) (ImmutableRef, error) {
parent: sr.parent,
equalMutable: sr,
refs: make(map[Mountable]struct{}),
md: &md,
md: md,
}
if descr := getDescription(sr.md); descr != "" {
if err := queueDescription(&md, descr); err != nil {
if err := queueDescription(md, descr); err != nil {
return nil, err
}
}
@ -259,8 +259,8 @@ func (sr *mutableRef) commit(ctx context.Context) (ImmutableRef, error) {
return nil, err
}
setSize(&md, sizeUnknown)
setEqualMutable(&md, sr.ID())
setSize(md, sizeUnknown)
setEqualMutable(md, sr.ID())
if err := md.Commit(); err != nil {
return nil, err
}

View File

@ -33,7 +33,7 @@ func main() {
}
func goBuildBase() llb.State {
goAlpine := llb.Image("docker.io/library/golang:1.8-alpine")
goAlpine := llb.Image("docker.io/library/golang:1.8-alpine@sha256:2287e0e274c1d2e9076c1f81d04f1a63c86b73c73603b09caada5da307a8f86d")
return goAlpine.
AddEnv("PATH", "/usr/local/go/bin:"+system.DefaultPathEnv).
AddEnv("GOPATH", "/go").
@ -76,7 +76,7 @@ func containerd(version string) llb.State {
func buildkit(opt buildOpt) llb.State {
repo := "github.com/moby/buildkit"
src := llb.Git(repo, "master")
src := llb.Git(repo, opt.buildkit)
if opt.buildkit == "local" {
src = llb.Local("buildkit-src")
}
@ -114,7 +114,7 @@ func copyFrom(src llb.State, srcPath, destPath string) llb.StateOption {
// copy copies files between 2 states using cp until there is no copyOp
func copy(src llb.State, srcPath string, dest llb.State, destPath string) llb.State {
cpImage := llb.Image("docker.io/library/alpine:latest")
cpImage := llb.Image("docker.io/library/alpine:latest@sha256:1072e499f3f655a032e88542330cf75b02e7bdf673278f701d7ba61629ee3ebe")
cp := cpImage.Run(llb.Shlexf("cp -a /src%s /dest%s", srcPath, destPath))
cp.AddMount("/src", src, llb.Readonly)
return cp.AddMount("/dest", dest)

View File

@ -1,7 +1,7 @@
ARG RUNC_VERSION=e775f0fba3ea329b8b766451c892c41a3d49594d
ARG CONTAINERD_VERSION=d1e11f17ec7b325f89608dd46c128300b8727d50
FROM golang:1.8-alpine AS gobuild-base
FROM golang:1.8-alpine@sha256:2287e0e274c1d2e9076c1f81d04f1a63c86b73c73603b09caada5da307a8f86d AS gobuild-base
RUN apk add --no-cache g++ linux-headers
RUN apk add --no-cache git make

View File

@ -9,7 +9,6 @@ import (
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
@ -124,10 +123,7 @@ func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Refere
vv := toInternalVertex(v)
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("parentVertex", b.v.Digest()))
defer pw.Close()
newref, err := b.s.getRef(ctx, vv, index)
newref, err := b.s.loadAndSolveChildVertex(ctx, b.v.Digest(), vv, index)
if err != nil {
return nil, err
}

View File

@ -138,12 +138,18 @@ func (e *execOp) ContentKeys(ctx context.Context, inputs [][]digest.Digest, refs
// based checksum for root
skipped := make([]int, 0)
type src struct {
index pb.InputIndex
selector string
}
skip := true
srcs := make([]string, len(refs))
srcsMap := make(map[src]struct{}, len(refs))
for _, m := range e.op.Mounts {
if m.Input != pb.Empty {
if m.Dest != pb.RootMount && m.Readonly { // could also include rw if they don't have a selector, but not sure if helps performance
srcs[int(m.Input)] = path.Join("/", m.Selector)
srcsMap[src{m.Input, path.Join("/", m.Selector)}] = struct{}{}
skip = false
} else {
skipped = append(skipped, int(m.Input))
@ -154,26 +160,35 @@ func (e *execOp) ContentKeys(ctx context.Context, inputs [][]digest.Digest, refs
return nil, nil
}
dgsts := make([]digest.Digest, len(refs))
eg, ctx := errgroup.WithContext(ctx)
for i, ref := range refs {
if srcs[i] == "" {
continue
srcs := make([]src, 0, len(srcsMap))
for s := range srcsMap {
srcs = append(srcs, s)
}
sort.Slice(srcs, func(i, j int) bool {
if srcs[i].index == srcs[j].index {
return srcs[i].selector < srcs[j].selector
}
func(i int, ref Reference) {
return srcs[i].index < srcs[j].index
})
dgsts := make([]digest.Digest, len(srcs))
eg, ctx := errgroup.WithContext(ctx)
for i, s := range srcs {
func(i int, s src, ref Reference) {
eg.Go(func() error {
ref, ok := toImmutableRef(ref)
if !ok {
return errors.Errorf("invalid reference")
}
dgst, err := contenthash.Checksum(ctx, ref, srcs[i])
dgst, err := contenthash.Checksum(ctx, ref, s.selector)
if err != nil {
return err
}
dgsts[i] = dgst
return nil
})
}(i, ref)
}(i, s, refs[int(s.index)])
}
if err := eg.Wait(); err != nil {
return nil, err

View File

@ -1,16 +1,17 @@
package solver
import (
"context"
"io"
"sync"
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
type jobKeyT string
@ -21,24 +22,36 @@ type jobList struct {
mu sync.RWMutex
refs map[string]*job
updateCond *sync.Cond
actives map[digest.Digest]*state
}
type state struct {
jobs map[*job]struct{}
solver VertexSolver
mpw *progress.MultiWriter
}
func newJobList() *jobList {
jl := &jobList{
refs: make(map[string]*job),
refs: make(map[string]*job),
actives: make(map[digest.Digest]*state),
}
jl.updateCond = sync.NewCond(jl.mu.RLocker())
return jl
}
func (jl *jobList) new(ctx context.Context, id string, g *vertex, pr progress.Reader) (context.Context, *job, error) {
func (jl *jobList) new(ctx context.Context, id string, pr progress.Reader, cache InstructionCache) (context.Context, *job, error) {
jl.mu.Lock()
defer jl.mu.Unlock()
if _, ok := jl.refs[id]; ok {
return nil, nil, errors.Errorf("id %s exists", id)
}
j := &job{g: g, pr: progress.NewMultiReader(pr)}
pw, _, _ := progress.FromContext(ctx) // TODO: remove this
sid := session.FromContext(ctx)
j := &job{l: jl, pr: progress.NewMultiReader(pr), pw: pw, session: sid, cache: cache}
jl.refs[id] = j
jl.updateCond.Broadcast()
go func() {
@ -77,27 +90,157 @@ func (jl *jobList) get(id string) (*job, error) {
}
}
func (jl *jobList) loadAndSolveChildVertex(ctx context.Context, dgst digest.Digest, vv *vertex, index Index, f ResolveOpFunc, cache InstructionCache) (Reference, error) {
jl.mu.Lock()
st, ok := jl.actives[dgst]
if !ok {
jl.mu.Unlock()
return nil, errors.Errorf("no such parent vertex: %v", dgst)
}
var newst *state
for j := range st.jobs {
var err error
newst, err = j.loadInternal(vv, f)
if err != nil {
jl.mu.Unlock()
return nil, err
}
}
jl.mu.Unlock()
return getRef(newst.solver, ctx, vv, index, cache)
}
type job struct {
mu sync.Mutex
g *vertex
pr *progress.MultiReader
l *jobList
pr *progress.MultiReader
pw progress.Writer
session string
cache InstructionCache
}
func (j *job) load(v *vertex, f ResolveOpFunc) error {
j.l.mu.Lock()
defer j.l.mu.Unlock()
_, err := j.loadInternal(v, f)
return err
}
func (j *job) loadInternal(v *vertex, f ResolveOpFunc) (*state, error) {
for _, inp := range v.inputs {
if _, err := j.loadInternal(inp.vertex, f); err != nil {
return nil, err
}
}
dgst := v.Digest()
st, ok := j.l.actives[dgst]
if !ok {
st = &state{
jobs: map[*job]struct{}{},
mpw: progress.NewMultiWriter(progress.WithMetadata("vertex", dgst)),
}
op, err := f(v)
if err != nil {
return nil, err
}
ctx := progress.WithProgress(context.Background(), st.mpw)
ctx = session.NewContext(ctx, j.session) // TODO: support multiple
s, err := newVertexSolver(ctx, v, op, j.cache, j.getSolver)
if err != nil {
return nil, err
}
st.solver = s
j.l.actives[dgst] = st
}
if _, ok := st.jobs[j]; !ok {
j.pw.Write(v.Digest().String(), v.clientVertex)
st.mpw.Add(j.pw)
st.jobs[j] = struct{}{}
}
return st, nil
}
func (j *job) discard() {
j.l.mu.Lock()
defer j.l.mu.Unlock()
j.pw.Close()
for k, st := range j.l.actives {
if _, ok := st.jobs[j]; ok {
delete(st.jobs, j)
}
if len(st.jobs) == 0 {
go st.solver.Release()
delete(j.l.actives, k)
}
}
}
func (j *job) getSolver(dgst digest.Digest) (VertexSolver, error) {
st, ok := j.l.actives[dgst]
if !ok {
return nil, errors.Errorf("vertex %v not found", dgst)
}
return st.solver, nil
}
func (j *job) getRef(ctx context.Context, v *vertex, index Index) (Reference, error) {
s, err := j.getSolver(v.Digest())
if err != nil {
return nil, err
}
return getRef(s, ctx, v, index, j.cache)
}
func getRef(s VertexSolver, ctx context.Context, v *vertex, index Index, cache InstructionCache) (Reference, error) {
k, err := s.CacheKey(ctx, index)
if err != nil {
return nil, err
}
ref, err := cache.Lookup(ctx, k)
if err != nil {
return nil, err
}
if ref != nil {
v.notifyCompleted(ctx, true, nil)
return ref.(Reference), nil
}
ev, err := s.OutputEvaluator(index)
if err != nil {
return nil, err
}
defer ev.Cancel()
for {
r, err := ev.Next(ctx)
if err != nil {
return nil, err
}
if r.CacheKey != "" {
ref, err := cache.Lookup(ctx, r.CacheKey)
if err != nil {
return nil, err
}
if ref != nil {
v.notifyCompleted(ctx, true, nil)
return ref.(Reference), nil
}
continue
}
return r.Reference, nil
}
}
func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
pr := j.pr.Reader(ctx)
if j.g != nil {
for v := range walk(j.g) {
vv := v.(*vertex)
ss := &client.SolveStatus{
Vertexes: []*client.Vertex{&vv.clientVertex},
}
select {
case <-ctx.Done():
return ctx.Err()
case ch <- ss:
}
}
}
for {
p, err := pr.Read(ctx)
if err != nil {
@ -110,10 +253,6 @@ func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
for _, p := range p {
switch v := p.Sys.(type) {
case client.Vertex:
vtx, ok := p.Meta("parentVertex")
if ok {
v.Parent = vtx.(digest.Digest)
}
ss.Vertexes = append(ss.Vertexes, &v)
case progress.Status:
@ -151,25 +290,3 @@ func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
}
}
}
func walk(v Vertex) chan Vertex {
cache := make(map[digest.Digest]struct{})
ch := make(chan Vertex, 32)
var send func(v Vertex)
send = func(v Vertex) {
for _, v := range v.Inputs() {
send(v.Vertex)
}
if _, ok := cache[v.Digest()]; !ok {
ch <- v
cache[v.Digest()] = struct{}{}
}
}
go func() {
send(v)
close(ch)
}()
return ch
}

89
solver/sharedref.go Normal file
View File

@ -0,0 +1,89 @@
package solver
import (
"sync"
"github.com/moby/buildkit/cache"
"golang.org/x/net/context"
)
// sharedRef is a wrapper around releasable that allows you to make new
// releasable child objects
type sharedRef struct {
mu sync.Mutex
refs map[*sharedRefInstance]struct{}
main Reference
Reference
}
func newSharedRef(main Reference) *sharedRef {
mr := &sharedRef{
refs: make(map[*sharedRefInstance]struct{}),
Reference: main,
}
mr.main = mr.Clone()
return mr
}
func (mr *sharedRef) Clone() Reference {
mr.mu.Lock()
r := &sharedRefInstance{sharedRef: mr}
mr.refs[r] = struct{}{}
mr.mu.Unlock()
return r
}
func (mr *sharedRef) Release(ctx context.Context) error {
return mr.main.Release(ctx)
}
func (mr *sharedRef) Sys() Reference {
sys := mr.Reference
if s, ok := sys.(interface {
Sys() Reference
}); ok {
return s.Sys()
}
return sys
}
type sharedRefInstance struct {
*sharedRef
}
func (r *sharedRefInstance) Release(ctx context.Context) error {
r.sharedRef.mu.Lock()
defer r.sharedRef.mu.Unlock()
delete(r.sharedRef.refs, r)
if len(r.sharedRef.refs) == 0 {
return r.sharedRef.Reference.Release(ctx)
}
return nil
}
func originRef(ref Reference) Reference {
sysRef := ref
if sys, ok := ref.(interface {
Sys() Reference
}); ok {
sysRef = sys.Sys()
}
return sysRef
}
func toImmutableRef(ref Reference) (cache.ImmutableRef, bool) {
immutable, ok := originRef(ref).(cache.ImmutableRef)
if !ok {
return nil, false
}
return &immutableRef{immutable, ref.Release}, true
}
type immutableRef struct {
cache.ImmutableRef
release func(context.Context) error
}
func (ir *immutableRef) Release(ctx context.Context) error {
return ir.release(ctx)
}

49
solver/signal.go Normal file
View File

@ -0,0 +1,49 @@
package solver
import "sync"
func newSignaller() *signal {
return &signal{}
}
type signal struct {
mu sync.Mutex
ch chan struct{}
}
func (s *signal) Wait() chan struct{} {
s.mu.Lock()
if s.ch == nil {
s.ch = make(chan struct{})
}
ch := s.ch
s.mu.Unlock()
return ch
}
func (s *signal) Reset() chan struct{} {
s.mu.Lock()
ch := s.ch
select {
case <-ch:
ch = make(chan struct{})
s.ch = ch
default:
}
s.mu.Unlock()
return ch
}
func (s *signal) Signal() {
s.mu.Lock()
if s.ch == nil {
s.ch = make(chan struct{})
}
ch := s.ch
select {
case <-ch:
default:
close(ch)
}
s.mu.Unlock()
}

View File

@ -11,6 +11,7 @@ import (
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/bgfunc"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
@ -61,16 +62,16 @@ type Op interface {
}
type InstructionCache interface {
Probe(ctx context.Context, key digest.Digest) (bool, error)
Lookup(ctx context.Context, key digest.Digest) (interface{}, error) // TODO: regular ref
Set(key digest.Digest, ref interface{}) error
SetContentMapping(key digest.Digest, value interface{}) error
SetContentMapping(contentKey, key digest.Digest) error
GetContentMapping(dgst digest.Digest) ([]digest.Digest, error)
}
type Solver struct {
resolve ResolveOpFunc
jobs *jobList
activeState activeState
cache InstructionCache
imageSource source.Source
}
@ -79,40 +80,6 @@ func New(resolve ResolveOpFunc, cache InstructionCache, imageSource source.Sourc
return &Solver{resolve: resolve, jobs: newJobList(), cache: cache, imageSource: imageSource}
}
type resolveImageConfig interface {
ResolveImageConfig(ctx context.Context, ref string) ([]byte, error)
}
type llbBridge struct {
resolveImageConfig
solver func(ctx context.Context, v *vertex, i Index) (Reference, error)
}
func (s *llbBridge) Solve(ctx context.Context, dt [][]byte) (cache.ImmutableRef, error) {
v, err := LoadLLB(dt)
if err != nil {
return nil, err
}
if len(v.Inputs()) == 0 {
return nil, errors.New("required vertex needs to have inputs")
}
index := v.Inputs()[0].Index
v = v.Inputs()[0].Vertex
vv := toInternalVertex(v)
ref, err := s.solver(ctx, vv, index)
if err != nil {
return nil, err
}
immutable, ok := toImmutableRef(ref)
if !ok {
return nil, errors.Errorf("invalid reference for exporting: %T", ref)
}
return immutable, nil
}
func (s *Solver) Solve(ctx context.Context, id string, f frontend.Frontend, v Vertex, exp exporter.ExporterInstance, frontendOpt map[string]string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -122,11 +89,8 @@ func (s *Solver) Solve(ctx context.Context, id string, f frontend.Frontend, v Ve
defer closeProgressWriter()
var vv *vertex
var solveVertex *vertex
var index Index
if v != nil {
origVertex := v
if len(v.Inputs()) == 0 {
return errors.New("required vertex needs to have inputs")
}
@ -135,37 +99,36 @@ func (s *Solver) Solve(ctx context.Context, id string, f frontend.Frontend, v Ve
v = v.Inputs()[0].Vertex
vv = toInternalVertex(v)
solveVertex = vv
if exp != nil {
vv = &vertex{digest: origVertex.Digest(), name: exp.Name()}
vv.inputs = []*input{{index: 0, vertex: solveVertex}}
vv.initClientVertex()
}
}
ctx, j, err := s.jobs.new(ctx, id, vv, pr)
ctx, j, err := s.jobs.new(ctx, id, pr, s.cache)
if err != nil {
return err
}
var ref Reference
var exporterOpt map[string]interface{}
if solveVertex != nil {
ref, err = s.getRef(ctx, solveVertex, index)
// solver: s.getRef,
if vv != nil {
if err := j.load(vv, s.resolve); err != nil {
j.discard()
return err
}
ref, err = j.getRef(ctx, vv, index)
} else {
ref, exporterOpt, err = f.Solve(ctx, &llbBridge{
solver: s.getRef,
job: j,
resolveOp: s.resolve,
resolveImageConfig: s.imageSource.(resolveImageConfig),
}, frontendOpt)
}
s.activeState.cancel(j)
j.discard()
if err != nil {
return err
}
defer func() {
ref.Release(context.TODO())
go ref.Release(context.TODO())
}()
immutable, ok := toImmutableRef(ref)
@ -198,327 +161,429 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.
return j.pipe(ctx, statusChan)
}
// getCacheKey return a cache key for a single output of a vertex
func (s *Solver) getCacheKey(ctx context.Context, g *vertex, inputs []digest.Digest, index Index) (dgst digest.Digest, retErr error) {
state, err := s.activeState.vertexState(ctx, g.digest, func() (Op, error) {
return s.resolve(g)
})
func (s *Solver) loadAndSolveChildVertex(ctx context.Context, dgst digest.Digest, vv *vertex, index Index) (Reference, error) {
return s.jobs.loadAndSolveChildVertex(ctx, dgst, vv, index, s.resolve, s.cache)
}
type VertexSolver interface {
CacheKey(ctx context.Context, index Index) (digest.Digest, error)
OutputEvaluator(Index) (VertexEvaluator, error)
Release() error
}
type vertexInput struct {
solver VertexSolver
ev VertexEvaluator
cacheKeys []digest.Digest
ref Reference
}
type vertexSolver struct {
inputs []*vertexInput
v *vertex
op Op
cache InstructionCache
refs []*sharedRef
f *bgfunc.F
ctx context.Context
baseKey digest.Digest
mu sync.Mutex
results []digest.Digest
signal *signal // used to notify that there are callers who need more data
}
type resolveF func(digest.Digest) (VertexSolver, error)
func newVertexSolver(ctx context.Context, v *vertex, op Op, c InstructionCache, resolve resolveF) (VertexSolver, error) {
inputs := make([]*vertexInput, len(v.inputs))
for i, in := range v.inputs {
s, err := resolve(in.vertex.digest)
if err != nil {
return nil, err
}
ev, err := s.OutputEvaluator(in.index)
if err != nil {
return nil, err
}
inputs[i] = &vertexInput{
solver: s,
ev: ev,
}
}
return &vertexSolver{
ctx: ctx,
inputs: inputs,
v: v,
op: op,
cache: c,
signal: newSignaller(),
}, nil
}
func (vs *vertexSolver) CacheKey(ctx context.Context, index Index) (digest.Digest, error) {
vs.mu.Lock()
defer vs.mu.Unlock()
if vs.baseKey == "" {
eg, ctx := errgroup.WithContext(vs.ctx)
for i := range vs.inputs {
func(i int) {
eg.Go(func() error {
k, err := vs.inputs[i].solver.CacheKey(ctx, vs.v.inputs[i].index)
if err != nil {
return err
}
vs.inputs[i].cacheKeys = append(vs.inputs[i].cacheKeys, k)
return nil
})
}(i)
}
var dgst digest.Digest
eg.Go(func() error {
var err error
dgst, err = vs.op.CacheKey(ctx)
if err != nil {
return err
}
return nil
})
if err := eg.Wait(); err != nil {
return "", err
}
vs.baseKey = dgst
}
k, err := vs.lastCacheKey()
if err != nil {
return "", err
}
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.Digest()))
defer pw.Close()
return cacheKeyForIndex(k, index), nil
}
if len(g.inputs) == 0 {
g.notifyStarted(ctx)
defer func() {
g.notifyCompleted(ctx, false, retErr)
}()
func (vs *vertexSolver) lastCacheKey() (digest.Digest, error) {
return vs.currentCacheKey(true)
}
func (vs *vertexSolver) mainCacheKey() (digest.Digest, error) {
return vs.currentCacheKey(false)
}
func (vs *vertexSolver) currentCacheKey(last bool) (digest.Digest, error) {
inputKeys := make([]digest.Digest, len(vs.inputs))
for i, inp := range vs.inputs {
if len(inp.cacheKeys) == 0 {
return "", errors.Errorf("inputs not processed")
}
if last {
inputKeys[i] = inp.cacheKeys[len(inp.cacheKeys)-1]
} else {
inputKeys[i] = inp.cacheKeys[0]
}
}
dgst, err = state.GetCacheKey(ctx, func(ctx context.Context, op Op) (digest.Digest, error) {
return op.CacheKey(ctx)
})
if err != nil {
return "", err
}
dt, err := json.Marshal(struct {
Index Index
Inputs []digest.Digest
Digest digest.Digest
}{
Index: index,
Inputs: inputs,
Digest: dgst,
})
Inputs []digest.Digest
CacheKey digest.Digest
}{Inputs: inputKeys, CacheKey: vs.baseKey})
if err != nil {
return "", err
}
return digest.FromBytes(dt), nil
}
// walkVertex walks all possible cache keys and a evaluated reference for a
// single output of a vertex.
func (s *Solver) walkVertex(ctx context.Context, g *vertex, index Index, fn func(digest.Digest, Reference) (bool, error)) (retErr error) {
state, err := s.activeState.vertexState(ctx, g.digest, func() (Op, error) {
return s.resolve(g)
})
if err != nil {
return err
}
inputCacheKeysMu := sync.Mutex{}
inputCacheKeys := make([][]digest.Digest, len(g.inputs))
walkerStopped := false
inputRefs := make([]Reference, len(g.inputs))
defer func() {
for _, r := range inputRefs {
if r != nil {
go r.Release(context.TODO())
}
}
}()
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
inputCtx, cancelInputCtx := context.WithCancel(ctx)
defer cancelInputCtx()
for i, in := range g.inputs {
func(i int, in *input) {
eg.Go(func() error {
var inputRef Reference
defer func() {
if inputRef != nil {
go inputRef.Release(context.TODO())
}
}()
err := s.walkVertex(inputCtx, in.vertex, in.index, func(k digest.Digest, ref Reference) (bool, error) {
if k == "" && ref == nil {
// indicator between cache key and reference
if inputRef != nil {
return true, nil
}
// TODO: might be good to block here if other inputs may
// cause cache hits to avoid duplicate work.
return false, nil
}
if ref != nil {
inputRef = ref
return true, nil
}
inputCacheKeysMu.Lock()
defer inputCacheKeysMu.Unlock()
if walkerStopped {
return walkerStopped, nil
}
// try all known combinations together with new key
inputCacheKeysCopy := append([][]digest.Digest{}, inputCacheKeys...)
inputCacheKeysCopy[i] = []digest.Digest{k}
inputCacheKeys[i] = append(inputCacheKeys[i], k)
for _, inputKeys := range combinations(inputCacheKeysCopy) {
cacheKey, err := s.getCacheKey(ctx, g, inputKeys, index)
if err != nil {
return false, err
}
stop, err := fn(cacheKey, nil)
if err != nil {
return false, err
}
if stop {
walkerStopped = true
cancelInputCtx() // parent matched, stop processing current node and its inputs
return true, nil
}
}
// if no parent matched, try looking up current node from cache
if s.cache != nil && inputRef == nil {
lookupRef, err := s.cache.Lookup(ctx, k)
if err != nil {
return false, err
}
if lookupRef != nil {
inputRef = lookupRef.(Reference)
in.vertex.recursiveMarkCached(ctx)
return true, nil
}
}
return false, nil
})
if inputRef != nil {
// make sure that the inputs for other steps don't get released on cancellation
if ref, ok := toImmutableRef(inputRef); ok {
if err := cache.CachePolicyRetain(ref); err != nil {
return err
}
if err := ref.Metadata().Commit(); err != nil {
return err
}
}
}
inputCacheKeysMu.Lock()
defer inputCacheKeysMu.Unlock()
if walkerStopped {
return nil
}
if err != nil {
return err
}
inputRefs[i] = inputRef
inputRef = nil
return nil
})
}(i, in)
}
if err := eg.Wait(); err != nil && !walkerStopped {
return err
}
} else {
cacheKey, err := s.getCacheKey(ctx, g, nil, index)
if err != nil {
return err
}
stop, err := fn(cacheKey, nil)
if err != nil {
return err
}
walkerStopped = stop
}
if walkerStopped {
return nil
}
var contentKeys []digest.Digest
if s.cache != nil {
// try to determine content based key
contentKeys, err = state.op.ContentKeys(ctx, combinations(inputCacheKeys), inputRefs)
if err != nil {
return err
}
for _, k := range contentKeys {
cks, err := s.cache.GetContentMapping(contentKeyWithIndex(k, index))
if err != nil {
return err
}
for _, k := range cks {
stop, err := fn(k, nil)
if err != nil {
return err
}
if stop {
return nil
}
}
}
}
// signal that no more cache keys are coming
stop, err := fn("", nil)
if err != nil {
return err
}
if stop {
return nil
}
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.Digest()))
defer pw.Close()
g.notifyStarted(ctx)
defer func() {
g.notifyCompleted(ctx, false, retErr)
}()
ref, err := state.GetRefs(ctx, index, func(ctx context.Context, op Op) ([]Reference, error) {
refs, err := op.Run(ctx, inputRefs)
func (vs *vertexSolver) OutputEvaluator(index Index) (VertexEvaluator, error) {
if vs.f == nil {
f, err := bgfunc.New(vs.ctx, vs.run)
if err != nil {
return nil, err
}
if s.cache != nil {
mainInputKeys := firstKeys(inputCacheKeys)
for i, ref := range refs {
if ref != nil {
cacheKey, err := s.getCacheKey(ctx, g, mainInputKeys, Index(i))
if err != nil {
return nil, err
}
r := originRef(ref)
if err := s.cache.Set(cacheKey, r); err != nil {
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
}
vs.f = f
}
c := vs.f.NewCaller()
ve := &vertexEvaluator{vertexSolver: vs, c: c, index: index}
return ve, nil
}
for _, k := range contentKeys {
if err := s.cache.SetContentMapping(contentKeyWithIndex(k, Index(i)), r); err != nil {
logrus.Errorf("failed to save content mapping: %v", err)
}
}
}
}
func (vs *vertexSolver) Release() error {
for _, inp := range vs.inputs {
if inp.ref != nil {
inp.ref.Release(context.TODO())
}
return refs, nil
})
if err != nil {
return err
}
// return reference
_, err = fn("", ref)
if err != nil {
return err
if vs.refs != nil {
for _, r := range vs.refs {
r.Release(context.TODO())
}
}
return nil
}
func (s *Solver) getRef(ctx context.Context, g *vertex, index Index) (ref Reference, retErr error) {
logrus.Debugf("> getRef %s %v %s", g.digest, index, g.name)
defer logrus.Debugf("< getRef %s %v", g.digest, index)
// run is called by the bgfunc concurrency primitive. This function may be
// called multiple times but never in parallal. Repeated calls should make an
// effort to continue from previous state. Lock vs.mu to syncronize data to the
// callers. Signal parameter can be used to notify callers that new data is
// available without returning from the function.
func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
vs.mu.Lock()
if vs.refs != nil {
vs.mu.Unlock()
return nil
}
vs.mu.Unlock()
var returnRef Reference
err := s.walkVertex(ctx, g, index, func(ck digest.Digest, ref Reference) (bool, error) {
if ref != nil {
returnRef = ref
return true, nil
wait := vs.signal.Wait()
select {
case <-ctx.Done():
return ctx.Err()
case <-wait:
}
// this is where you lookup the cache keys that were successfully probed
eg, ctx2 := errgroup.WithContext(ctx)
// process all the inputs
for i, inp := range vs.inputs {
if inp.ref == nil {
func(i int) {
eg.Go(func() error {
inp := vs.inputs[i]
defer inp.ev.Cancel()
for {
select {
case <-ctx2.Done():
return ctx2.Err()
case <-wait:
}
// check if current cache key is in cache
if len(inp.cacheKeys) > 0 {
ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1])
if err != nil {
return err
}
if ref != nil {
inp.ref = ref.(Reference)
return nil
}
}
// evaluate next cachekey/reference for input
res, err := inp.ev.Next(ctx2)
if err != nil {
return err
}
if res == nil { // there is no more data coming
return nil
}
if ref := res.Reference; ref != nil {
if ref, ok := toImmutableRef(ref); ok {
if !cache.HasCachePolicyRetain(ref) {
if err := cache.CachePolicyRetain(ref); err != nil {
return err
}
ref.Metadata().Commit()
}
inp.ref = ref
}
return nil
}
// Only try matching cache if the cachekey for input is present
exists, err := vs.cache.Probe(ctx2, res.CacheKey)
if err != nil {
return err
}
if exists {
vs.mu.Lock()
inp.cacheKeys = append(inp.cacheKeys, res.CacheKey)
dgst, err := vs.lastCacheKey()
if err != nil {
vs.mu.Unlock()
return err
}
vs.results = append(vs.results, dgst)
signal() // wake up callers
wait = vs.signal.Reset() // make sure we don't continue unless there are callers
vs.mu.Unlock()
}
}
})
}(i)
}
if ck == "" {
return false, nil
}
lookupRef, err := s.cache.Lookup(ctx, ck)
}
if err := eg.Wait(); err != nil {
return err
}
// Find extra cache keys by content
inputRefs := make([]Reference, len(vs.inputs))
lastInputKeys := make([]digest.Digest, len(vs.inputs))
for i := range vs.inputs {
inputRefs[i] = vs.inputs[i].ref
lastInputKeys[i] = vs.inputs[i].cacheKeys[len(vs.inputs[i].cacheKeys)-1]
}
// TODO: avoid doing this twice on cancellation+resume
contentKeys, err := vs.op.ContentKeys(ctx, [][]digest.Digest{lastInputKeys}, inputRefs)
if err != nil {
return err
}
var extraKeys []digest.Digest
for _, k := range contentKeys {
cks, err := vs.cache.GetContentMapping(k)
if err != nil {
return false, err
return err
}
if lookupRef != nil {
g.recursiveMarkCached(ctx)
returnRef = lookupRef.(Reference)
return true, nil
extraKeys = append(extraKeys, cks...)
}
if len(extraKeys) > 0 {
vs.mu.Lock()
vs.results = append(vs.results, extraKeys...)
signal()
wait = vs.signal.Reset()
vs.mu.Unlock()
}
select {
case <-ctx.Done():
return
case <-wait:
}
// no cache hit. start evaluating the node
vs.v.notifyStarted(ctx)
defer func() {
vs.v.notifyCompleted(ctx, false, retErr)
}()
refs, err := vs.op.Run(ctx, inputRefs)
if err != nil {
return err
}
sr := make([]*sharedRef, len(refs))
for i, r := range refs {
sr[i] = newSharedRef(r)
}
vs.refs = sr
// store the cacheKeys for current refs
if vs.cache != nil {
cacheKey, err := vs.mainCacheKey()
if err != nil {
return err
}
return false, nil
for i, ref := range refs {
if err != nil {
return err
}
r := originRef(ref)
if err := vs.cache.Set(cacheKeyForIndex(cacheKey, Index(i)), r); err != nil {
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
}
}
if len(contentKeys) > 0 {
for _, ck := range contentKeys {
if err := vs.cache.SetContentMapping(ck, cacheKey); err != nil {
logrus.Errorf("failed to save content mapping: %v", err)
}
}
}
}
return nil
}
type VertexEvaluator interface {
Next(context.Context) (*VertexResult, error)
Cancel() error
}
type vertexEvaluator struct {
*vertexSolver
c *bgfunc.Caller
cursor int
index Index
}
func (ve *vertexEvaluator) Next(ctx context.Context) (*VertexResult, error) {
v, err := ve.c.Call(ctx, func() (interface{}, error) {
ve.mu.Lock()
defer ve.mu.Unlock()
if ve.refs != nil {
return &VertexResult{Reference: ve.refs[int(ve.index)].Clone()}, nil
}
if i := ve.cursor; i < len(ve.results) {
ve.cursor++
return &VertexResult{CacheKey: cacheKeyForIndex(ve.results[i], ve.index)}, nil
}
ve.signal.Signal()
return nil, nil
})
if err != nil {
return nil, err
}
return returnRef, nil
if v == nil {
return nil, nil // no more records are coming
}
return v.(*VertexResult), nil
}
func firstKeys(inp [][]digest.Digest) []digest.Digest {
var out []digest.Digest
for _, v := range inp {
out = append(out, v[0])
}
return out
func (ve *vertexEvaluator) Cancel() error {
return ve.c.Cancel()
}
func combinations(inp [][]digest.Digest) [][]digest.Digest {
var out [][]digest.Digest
if len(inp) == 0 {
return inp
}
if len(inp) == 1 {
for _, v := range inp[0] {
out = append(out, []digest.Digest{v})
}
return out
}
for _, v := range inp[0] {
for _, v2 := range combinations(inp[1:]) {
out = append(out, append([]digest.Digest{v}, v2...))
}
}
return out
type VertexResult struct {
CacheKey digest.Digest
Reference Reference
}
func contentKeyWithIndex(dgst digest.Digest, index Index) digest.Digest {
// llbBridge is an helper used by frontends
type llbBridge struct {
resolveImageConfig
job *job
resolveOp ResolveOpFunc
}
type resolveImageConfig interface {
ResolveImageConfig(ctx context.Context, ref string) ([]byte, error)
}
func (s *llbBridge) Solve(ctx context.Context, dt [][]byte) (cache.ImmutableRef, error) {
v, err := LoadLLB(dt)
if err != nil {
return nil, err
}
if len(v.Inputs()) == 0 {
return nil, errors.New("required vertex needs to have inputs")
}
index := v.Inputs()[0].Index
v = v.Inputs()[0].Vertex
vv := toInternalVertex(v)
if err := s.job.load(vv, s.resolveOp); err != nil {
return nil, err
}
ref, err := s.job.getRef(ctx, vv, index)
if err != nil {
return nil, err
}
immutable, ok := toImmutableRef(ref)
if !ok {
return nil, errors.Errorf("invalid reference for exporting: %T", ref)
}
return immutable, nil
}
func cacheKeyForIndex(dgst digest.Digest, index Index) digest.Digest {
return digest.FromBytes([]byte(fmt.Sprintf("%s.%d", dgst, index)))
}

View File

@ -1,214 +0,0 @@
package solver
import (
"sync"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// activeState holds the references to snapshots what are currently activve
// and allows sharing them between jobs
type activeState struct {
mu sync.Mutex
states map[digest.Digest]*state
flightcontrol.Group
}
type state struct {
*activeState
key digest.Digest
jobs map[*job]struct{}
refs []*sharedRef
cacheKey digest.Digest
op Op
progressCtx context.Context
cacheCtx context.Context
}
func (s *activeState) vertexState(ctx context.Context, key digest.Digest, cb func() (Op, error)) (*state, error) {
jv := ctx.Value(jobKey)
if jv == nil {
return nil, errors.Errorf("can't get vertex state without active job")
}
j := jv.(*job)
s.mu.Lock()
defer s.mu.Unlock()
if s.states == nil {
s.states = map[digest.Digest]*state{}
}
st, ok := s.states[key]
if !ok {
op, err := cb()
if err != nil {
return nil, err
}
st = &state{key: key, jobs: map[*job]struct{}{}, op: op, activeState: s}
s.states[key] = st
}
st.jobs[j] = struct{}{}
return st, nil
}
func (s *activeState) cancel(j *job) {
s.mu.Lock()
defer s.mu.Unlock()
for k, st := range s.states {
if _, ok := st.jobs[j]; ok {
delete(st.jobs, j)
}
if len(st.jobs) == 0 {
for _, r := range st.refs {
go r.Release(context.TODO())
}
delete(s.states, k)
}
}
}
func (s *state) GetRefs(ctx context.Context, index Index, cb func(context.Context, Op) ([]Reference, error)) (Reference, error) {
_, err := s.Do(ctx, s.key.String(), func(doctx context.Context) (interface{}, error) {
if s.refs != nil {
if err := writeProgressSnapshot(s.progressCtx, ctx); err != nil {
return nil, err
}
return nil, nil
}
refs, err := cb(doctx, s.op)
if err != nil {
return nil, err
}
sharedRefs := make([]*sharedRef, 0, len(refs))
for _, r := range refs {
sharedRefs = append(sharedRefs, newSharedRef(r))
}
s.refs = sharedRefs
s.progressCtx = doctx
return nil, nil
})
if err != nil {
return nil, err
}
return s.refs[int(index)].Clone(), nil
}
func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (digest.Digest, error)) (digest.Digest, error) {
_, err := s.Do(ctx, "cache:"+s.key.String(), func(doctx context.Context) (interface{}, error) {
if s.cacheKey != "" {
if err := writeProgressSnapshot(s.cacheCtx, ctx); err != nil {
return nil, err
}
return nil, nil
}
cacheKey, err := cb(doctx, s.op)
if err != nil {
return nil, err
}
s.cacheKey = cacheKey
s.cacheCtx = doctx
return nil, nil
})
if err != nil {
return "", err
}
return s.cacheKey, nil
}
func writeProgressSnapshot(srcCtx, destCtx context.Context) error {
pw, ok, _ := progress.FromContext(destCtx)
if ok {
if srcCtx != nil {
return flightcontrol.WriteProgress(srcCtx, pw)
}
}
return nil
}
// sharedRef is a wrapper around releasable that allows you to make new
// releasable child objects
type sharedRef struct {
mu sync.Mutex
refs map[*sharedRefInstance]struct{}
main Reference
Reference
}
func newSharedRef(main Reference) *sharedRef {
mr := &sharedRef{
refs: make(map[*sharedRefInstance]struct{}),
Reference: main,
}
mr.main = mr.Clone()
return mr
}
func (mr *sharedRef) Clone() Reference {
mr.mu.Lock()
r := &sharedRefInstance{sharedRef: mr}
mr.refs[r] = struct{}{}
mr.mu.Unlock()
return r
}
func (mr *sharedRef) Release(ctx context.Context) error {
return mr.main.Release(ctx)
}
func (mr *sharedRef) Sys() Reference {
sys := mr.Reference
if s, ok := sys.(interface {
Sys() Reference
}); ok {
return s.Sys()
}
return sys
}
type sharedRefInstance struct {
*sharedRef
}
func (r *sharedRefInstance) Release(ctx context.Context) error {
r.sharedRef.mu.Lock()
defer r.sharedRef.mu.Unlock()
delete(r.sharedRef.refs, r)
if len(r.sharedRef.refs) == 0 {
return r.sharedRef.Reference.Release(ctx)
}
return nil
}
func originRef(ref Reference) Reference {
sysRef := ref
if sys, ok := ref.(interface {
Sys() Reference
}); ok {
sysRef = sys.Sys()
}
return sysRef
}
func toImmutableRef(ref Reference) (cache.ImmutableRef, bool) {
immutable, ok := originRef(ref).(cache.ImmutableRef)
if !ok {
return nil, false
}
return &immutableRef{immutable, ref.Release}, true
}
type immutableRef struct {
cache.ImmutableRef
release func(context.Context) error
}
func (ir *immutableRef) Release(ctx context.Context) error {
return ir.release(ctx)
}

View File

@ -43,6 +43,7 @@ type vertex struct {
digest digest.Digest
clientVertex client.Vertex
name string
notifyMu sync.Mutex
}
func (v *vertex) initClientVertex() {
@ -66,6 +67,7 @@ func (v *vertex) Sys() interface{} {
}
func (v *vertex) Inputs() (inputs []Input) {
inputs = make([]Input, 0, len(v.inputs))
for _, i := range v.inputs {
inputs = append(inputs, Input{i.index, i.vertex})
}
@ -81,6 +83,7 @@ func (v *vertex) inputRequiresExport(i int) bool {
}
func (v *vertex) notifyStarted(ctx context.Context) {
v.recursiveMarkCached(ctx)
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
@ -93,6 +96,7 @@ func (v *vertex) notifyCompleted(ctx context.Context, cached bool, err error) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
v.recursiveMarkCached(ctx)
if v.clientVertex.Started == nil {
v.clientVertex.Started = &now
}
@ -106,9 +110,12 @@ func (v *vertex) notifyCompleted(ctx context.Context, cached bool, err error) {
func (v *vertex) recursiveMarkCached(ctx context.Context) {
for _, inp := range v.inputs {
inp.vertex.recursiveMarkCached(ctx)
}
if v.clientVertex.Started == nil {
v.notifyCompleted(ctx, true, nil)
inp.vertex.notifyMu.Lock()
if inp.vertex.clientVertex.Started == nil {
inp.vertex.recursiveMarkCached(ctx)
inp.vertex.notifyCompleted(ctx, true, nil)
}
inp.vertex.notifyMu.Unlock()
}
}

173
util/bgfunc/bgfunc.go Normal file
View File

@ -0,0 +1,173 @@
package bgfunc
import (
"sync"
"golang.org/x/net/context"
)
func New(ctx context.Context, fn func(context.Context, func()) error) (*F, error) {
f := &F{mainCtx: ctx, f: fn}
f.cond = sync.NewCond(f.mu.RLocker())
return f, nil
}
type F struct {
mainCtx context.Context
mu sync.RWMutex
cond *sync.Cond
running bool
runMu sync.Mutex
f func(context.Context, func()) error
done bool
err error
cancelCtx func()
ctxErr chan error // this channel is used for the caller to wait cancellation error
semMu sync.Mutex
sem int
}
func (f *F) NewCaller() *Caller {
c := &Caller{F: f, active: true}
f.addSem()
return c
}
func (f *F) run() {
f.runMu.Lock()
if !f.running && !f.done {
f.running = true
go func() {
var err error
var nodone bool
ctxErr := make(chan error, 1)
defer func() {
// release all cancellations
f.runMu.Lock()
f.running = false
f.runMu.Unlock()
f.mu.Lock()
if !nodone {
f.done = true
f.err = err
}
f.cond.Broadcast()
ctxErr <- err
f.mu.Unlock()
}()
ctx, cancel := context.WithCancel(f.mainCtx)
f.runMu.Lock()
f.cancelCtx = cancel
f.ctxErr = ctxErr
f.runMu.Unlock()
err = f.f(ctx, func() {
f.cond.Broadcast()
})
select {
case <-ctx.Done():
nodone = true // don't set f.done
default:
}
}()
}
f.runMu.Unlock()
}
func (f *F) addSem() {
f.semMu.Lock()
f.sem++
f.semMu.Unlock()
}
func (f *F) clearSem() error {
f.semMu.Lock()
f.sem--
var err error
if f.sem == 0 && f.cancelCtx != nil {
f.cancelCtx()
err = <-f.ctxErr
}
f.semMu.Unlock()
return err
}
type Caller struct {
*F
active bool
}
func (c *Caller) Call(ctx context.Context, f func() (interface{}, error)) (interface{}, error) {
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-ctx.Done():
c.F.mu.Lock()
c.F.cond.Broadcast()
c.F.mu.Unlock()
case <-done:
}
}()
c.F.mu.RLock()
for {
select {
case <-ctx.Done():
c.F.mu.RUnlock()
if err := c.Cancel(); err != nil {
return nil, err
}
return nil, ctx.Err()
default:
}
if err := c.F.err; err != nil {
c.F.mu.RUnlock()
return nil, err
}
c.ensureStarted()
v, err := f()
if err != nil {
c.F.mu.RUnlock()
return nil, err
}
if v != nil {
c.F.mu.RUnlock()
return v, nil
}
if c.F.done {
c.F.mu.RUnlock()
return nil, nil
}
c.F.cond.Wait()
}
}
func (c *Caller) Cancel() error {
if c.active {
c.active = false
return c.F.clearSem()
}
return nil
}
// called with readlock
func (c *Caller) ensureStarted() {
if c.F.done {
return
}
if !c.active {
c.active = true
c.F.addSem()
}
c.F.run()
}

277
util/bgfunc/bgfunc_test.go Normal file
View File

@ -0,0 +1,277 @@
package bgfunc
import (
"sync"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
func TestBgFuncSimple(t *testing.T) {
var res string
var mu sync.Mutex
calls1 := 0
f, err := New(context.TODO(), func(ctx context.Context, signal func()) error {
calls1++
mu.Lock()
res = "ok"
signal()
mu.Unlock()
return nil
})
require.NoError(t, err)
fn := func() (interface{}, error) {
mu.Lock()
defer mu.Unlock()
if res != "" {
return res, nil
}
return nil, nil
}
c1 := f.NewCaller()
v, err := c1.Call(context.TODO(), fn)
require.NoError(t, err)
c2 := f.NewCaller()
v2, err := c2.Call(context.TODO(), fn)
require.NoError(t, err)
require.Equal(t, v.(string), "ok")
require.Equal(t, v2.(string), "ok")
require.Equal(t, calls1, 1)
}
func TestSignal(t *testing.T) {
var res []string
var mu sync.Mutex
next := make(chan struct{})
f, err := New(context.TODO(), func(ctx context.Context, signal func()) error {
mu.Lock()
res = append(res, "ok1")
signal()
mu.Unlock()
<-next
mu.Lock()
res = append(res, "ok2")
signal()
mu.Unlock()
return nil
})
require.NoError(t, err)
i := 0
fn := func() (interface{}, error) {
mu.Lock()
defer mu.Unlock()
if i < len(res) {
v := res[i]
i++
return v, nil
}
return nil, nil
}
c1 := f.NewCaller()
v, err := c1.Call(context.TODO(), fn)
require.NoError(t, err)
require.Equal(t, v.(string), "ok1")
resCh := make(chan interface{})
go func() {
v, err = c1.Call(context.TODO(), fn)
require.NoError(t, err)
resCh <- v
}()
select {
case <-resCh:
require.Fail(t, "unexpected result")
case <-time.After(50 * time.Millisecond):
close(next)
}
select {
case v := <-resCh:
require.Equal(t, v.(string), "ok2")
case <-time.After(100 * time.Millisecond):
require.Fail(t, "timeout")
}
v, err = c1.Call(context.TODO(), fn)
require.NoError(t, err)
require.Nil(t, v)
}
func TestCancellation(t *testing.T) {
var res []string
var mu sync.Mutex
next := make(chan struct{})
returned := 0
f, err := New(context.TODO(), func(ctx context.Context, signal func()) error {
defer func() {
returned++
}()
mu.Lock()
if len(res) == 0 {
res = append(res, "ok1")
}
signal()
mu.Unlock()
select {
case <-next:
case <-ctx.Done():
return ctx.Err()
}
mu.Lock()
res = append(res, "ok2")
signal()
mu.Unlock()
return nil
})
require.NoError(t, err)
i := 0
fn1 := func() (interface{}, error) {
mu.Lock()
defer mu.Unlock()
if i < len(res) {
v := res[i]
i++
return v, nil
}
return nil, nil
}
i2 := 0
fn2 := func() (interface{}, error) {
mu.Lock()
defer mu.Unlock()
if i2 < len(res) {
v := res[i2]
i2++
return v, nil
}
return nil, nil
}
c1 := f.NewCaller()
v, err := c1.Call(context.TODO(), fn1)
require.NoError(t, err)
require.Equal(t, v.(string), "ok1")
c2 := f.NewCaller()
v2, err := c2.Call(context.TODO(), fn2)
require.NoError(t, err)
require.Equal(t, v2.(string), "ok1")
c3 := f.NewCaller()
var cancelFirst func()
firstErr := make(chan error)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
cancelFirst = cancel
_, err := c1.Call(ctx, fn1)
firstErr <- err
}()
var cancelSecond func()
secondErr := make(chan error)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
cancelSecond = cancel
_, err := c2.Call(ctx, fn2)
secondErr <- err
}()
select {
case err := <-firstErr:
require.Equal(t, err.Error(), context.DeadlineExceeded.Error())
c3.Cancel()
case <-secondErr:
require.Fail(t, "invalid error")
case <-time.After(100 * time.Millisecond):
require.Fail(t, "timeout")
}
require.Equal(t, returned, 0)
select {
case err := <-secondErr:
require.Equal(t, err.Error(), context.Canceled.Error())
case <-time.After(100 * time.Millisecond):
require.Fail(t, "timeout")
}
require.Equal(t, 1, returned)
close(next)
v, err = c2.Call(context.TODO(), fn2)
require.NoError(t, err)
require.Equal(t, v.(string), "ok2")
v, err = c1.Call(context.TODO(), fn1)
require.NoError(t, err)
require.Equal(t, v.(string), "ok2")
v, err = c2.Call(context.TODO(), fn2)
require.NoError(t, err)
require.Equal(t, v, nil)
}
func TestError(t *testing.T) {
// function returns an error in the middle of processing
var res string
var mu sync.Mutex
next := make(chan struct{})
returned := 0
f, err := New(context.TODO(), func(ctx context.Context, signal func()) error {
defer func() {
returned++
}()
mu.Lock()
res = "ok1"
signal()
mu.Unlock()
select {
case <-next:
case <-ctx.Done():
return ctx.Err()
}
return errors.Errorf("invalid")
})
fn1 := func() (interface{}, error) {
mu.Lock()
defer mu.Unlock()
if res != "" {
defer func() {
res = ""
}()
return res, nil
}
return nil, nil
}
c1 := f.NewCaller()
v, err := c1.Call(context.TODO(), fn1)
require.NoError(t, err)
require.NoError(t, err)
require.Equal(t, v.(string), "ok1")
close(next)
_, err = c1.Call(context.TODO(), fn1)
require.Error(t, err)
require.Equal(t, err.Error(), "invalid")
}

View File

@ -177,7 +177,18 @@ func (c *call) Value(key interface{}) interface{} {
}
c.mu.Lock()
defer c.mu.Unlock()
for _, ctx := range append([]context.Context{c.progressCtx}, c.ctxs...) {
ctx := c.progressCtx
select {
case <-ctx.Done():
default:
if v := ctx.Value(key); v != nil {
return v
}
}
if len(c.ctxs) > 0 {
ctx = c.ctxs[0]
select {
case <-ctx.Done():
default:
@ -186,6 +197,7 @@ func (c *call) Value(key interface{}) interface{} {
}
}
}
return nil
}

View File

@ -0,0 +1,105 @@
package progress
import (
"sort"
"sync"
"time"
)
type rawProgressWriter interface {
WriteRawProgress(*Progress) error
Close() error
}
type MultiWriter struct {
mu sync.Mutex
items []*Progress
writers map[rawProgressWriter]struct{}
done bool
meta map[string]interface{}
}
func NewMultiWriter(opts ...WriterOption) *MultiWriter {
mw := &MultiWriter{
writers: map[rawProgressWriter]struct{}{},
meta: map[string]interface{}{},
}
for _, o := range opts {
o(mw)
}
return mw
}
func (ps *MultiWriter) Add(pw Writer) {
rw, ok := pw.(rawProgressWriter)
if !ok {
return
}
ps.mu.Lock()
plist := make([]*Progress, 0, len(ps.items))
for _, p := range ps.items {
plist = append(plist, p)
}
sort.Slice(plist, func(i, j int) bool {
return plist[i].Timestamp.Before(plist[j].Timestamp)
})
for _, p := range plist {
rw.WriteRawProgress(p)
}
ps.writers[rw] = struct{}{}
ps.mu.Unlock()
}
func (ps *MultiWriter) Delete(pw Writer) {
rw, ok := pw.(rawProgressWriter)
if !ok {
return
}
ps.mu.Lock()
delete(ps.writers, rw)
ps.mu.Unlock()
}
func (ps *MultiWriter) Write(id string, v interface{}) error {
p := &Progress{
ID: id,
Timestamp: time.Now(),
Sys: v,
meta: ps.meta,
}
return ps.WriteRawProgress(p)
}
func (ps *MultiWriter) WriteRawProgress(p *Progress) error {
meta := p.meta
if len(ps.meta) > 0 {
meta = map[string]interface{}{}
for k, v := range p.meta {
meta[k] = v
}
for k, v := range ps.meta {
if _, ok := meta[k]; !ok {
meta[k] = v
}
}
}
p.meta = meta
return ps.writeRawProgress(p)
}
func (ps *MultiWriter) writeRawProgress(p *Progress) error {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.items = append(ps.items, p)
for w := range ps.writers {
if err := w.WriteRawProgress(p); err != nil {
return err
}
}
return nil
}
func (ps *MultiWriter) Close() error {
return nil
}

View File

@ -20,8 +20,12 @@ var contextKey = contextKeyT("buildkit/util/progress")
// FromContext returns a progress writer from a context.
func FromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) {
pw, ok := ctx.Value(contextKey).(*progressWriter)
v := ctx.Value(contextKey)
pw, ok := v.(*progressWriter)
if !ok {
if pw, ok := v.(*MultiWriter); ok {
return pw, true, ctx
}
return &noOpWriter{}, false, ctx
}
pw = newWriter(pw)
@ -39,15 +43,22 @@ type WriterOption func(Writer)
// function to signal that no new writes will happen to this context.
func NewContext(ctx context.Context) (Reader, context.Context, func()) {
pr, pw, cancel := pipe()
ctx = context.WithValue(ctx, contextKey, pw)
ctx = WithProgress(ctx, pw)
return pr, ctx, cancel
}
func WithProgress(ctx context.Context, pw Writer) context.Context {
return context.WithValue(ctx, contextKey, pw)
}
func WithMetadata(key string, val interface{}) WriterOption {
return func(w Writer) {
if pw, ok := w.(*progressWriter); ok {
pw.meta[key] = val
}
if pw, ok := w.(*MultiWriter); ok {
pw.meta[key] = val
}
}
}
@ -211,8 +222,8 @@ func (pw *progressWriter) WriteRawProgress(p *Progress) error {
func (pw *progressWriter) writeRawProgress(p *Progress) error {
pw.reader.mu.Lock()
pw.reader.dirty[p.ID] = p
pw.reader.mu.Unlock()
pw.reader.cond.Broadcast()
pw.reader.mu.Unlock()
return nil
}

View File

@ -5,6 +5,7 @@ package oci
import (
"context"
"path"
"sync"
"github.com/containerd/containerd"
"github.com/containerd/containerd/mount"
@ -108,9 +109,17 @@ func (s *submounts) subMount(m mount.Mount, subPath string) (mount.Mount, error)
}
func (s *submounts) cleanup() {
var wg sync.WaitGroup
wg.Add(len(s.m))
for _, m := range s.m {
m.unmount()
func(m mountRef) {
go func() {
m.unmount()
wg.Done()
}()
}(m)
}
wg.Wait()
}
func sub(m mount.Mount, subPath string) mount.Mount {