Merge pull request #447 from tonistiigi/concurrent-cache-mount
llbsolver: allow concurrent cache mount accessdocker-18.09
commit
4b8dc5b08b
|
@ -10,6 +10,7 @@ import (
|
|||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/mount"
|
||||
|
@ -165,15 +166,23 @@ func (e *execOp) getMountDeps() ([]dep, error) {
|
|||
}
|
||||
|
||||
func (e *execOp) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, m *pb.Mount) (cache.MutableRef, error) {
|
||||
makeMutable := func(cache.ImmutableRef) (cache.MutableRef, error) {
|
||||
desc := fmt.Sprintf("cached mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " "))
|
||||
return e.cm.New(ctx, ref, cache.WithDescription(desc), cache.CachePolicyRetain)
|
||||
}
|
||||
|
||||
key := "cache-dir:" + id
|
||||
if ref != nil {
|
||||
key += ":" + ref.ID()
|
||||
}
|
||||
|
||||
return sharedCacheRefs.get(key, func() (cache.MutableRef, error) {
|
||||
return e.getRefCacheDirNoCache(ctx, key, ref, id, m)
|
||||
})
|
||||
}
|
||||
|
||||
func (e *execOp) getRefCacheDirNoCache(ctx context.Context, key string, ref cache.ImmutableRef, id string, m *pb.Mount) (cache.MutableRef, error) {
|
||||
makeMutable := func(cache.ImmutableRef) (cache.MutableRef, error) {
|
||||
desc := fmt.Sprintf("cached mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " "))
|
||||
return e.cm.New(ctx, ref, cache.WithDescription(desc), cache.CachePolicyRetain)
|
||||
}
|
||||
|
||||
sis, err := e.md.Search(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -408,3 +417,71 @@ func (m *tmpfsMount) Mount() ([]mount.Mount, error) {
|
|||
func (m *tmpfsMount) Release() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var sharedCacheRefs = &cacheRefs{}
|
||||
|
||||
type cacheRefs struct {
|
||||
mu sync.Mutex
|
||||
shares map[string]*cacheRefShare
|
||||
}
|
||||
|
||||
func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.MutableRef, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if r.shares == nil {
|
||||
r.shares = map[string]*cacheRefShare{}
|
||||
}
|
||||
|
||||
share, ok := r.shares[key]
|
||||
if ok {
|
||||
return share.clone(), nil
|
||||
}
|
||||
|
||||
mref, err := fn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
share = &cacheRefShare{MutableRef: mref, main: r, key: key, refs: map[*cacheRef]struct{}{}}
|
||||
r.shares[key] = share
|
||||
|
||||
return share.clone(), nil
|
||||
}
|
||||
|
||||
type cacheRefShare struct {
|
||||
cache.MutableRef
|
||||
mu sync.Mutex
|
||||
refs map[*cacheRef]struct{}
|
||||
main *cacheRefs
|
||||
key string
|
||||
}
|
||||
|
||||
func (r *cacheRefShare) clone() cache.MutableRef {
|
||||
cacheRef := &cacheRef{cacheRefShare: r}
|
||||
r.mu.Lock()
|
||||
r.refs[cacheRef] = struct{}{}
|
||||
r.mu.Unlock()
|
||||
return cacheRef
|
||||
}
|
||||
|
||||
func (r *cacheRefShare) release(ctx context.Context) error {
|
||||
r.main.mu.Lock()
|
||||
defer r.main.mu.Unlock()
|
||||
delete(r.main.shares, r.key)
|
||||
return r.MutableRef.Release(ctx)
|
||||
}
|
||||
|
||||
type cacheRef struct {
|
||||
*cacheRefShare
|
||||
}
|
||||
|
||||
func (r *cacheRef) Release(ctx context.Context) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
delete(r.refs, r)
|
||||
if len(r.refs) == 0 {
|
||||
return r.release(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue