Merge pull request #338 from tonistiigi/cachekey-refactor

solver-next: cachekey refactor
docker-18.09
Akihiro Suda 2018-04-17 12:53:21 +09:00 committed by GitHub
commit 4d4f369838
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 2055 additions and 1148 deletions

View File

@ -3,9 +3,11 @@ package boltdbcachestorage
import (
"bytes"
"encoding/json"
"fmt"
"github.com/boltdb/bolt"
solver "github.com/moby/buildkit/solver-next"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
@ -35,6 +37,7 @@ func NewStore(dbPath string) (*Store, error) {
}); err != nil {
return nil, err
}
db.NoSync = true
return &Store{db: db}, nil
}
@ -327,7 +330,94 @@ func (s *Store) WalkLinks(id string, link solver.CacheInfoLink, fn func(id strin
return nil
}
func (s *Store) HasLink(id string, link solver.CacheInfoLink, target string) bool {
var v bool
if err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(linksBucket))
if b == nil {
return nil
}
b = b.Bucket([]byte(id))
if b == nil {
return nil
}
dt, err := json.Marshal(link)
if err != nil {
return err
}
v = b.Get(bytes.Join([][]byte{dt, []byte(target)}, []byte("@"))) != nil
return nil
}); err != nil {
return false
}
return v
}
func (s *Store) WalkBacklinks(id string, fn func(id string, link solver.CacheInfoLink) error) error {
var outIDs []string
var outLinks []solver.CacheInfoLink
if err := s.db.View(func(tx *bolt.Tx) error {
links := tx.Bucket([]byte(linksBucket))
if links == nil {
return nil
}
backLinks := tx.Bucket([]byte(backlinksBucket))
if backLinks == nil {
return nil
}
b := backLinks.Bucket([]byte(id))
if b == nil {
return nil
}
if err := b.ForEach(func(bid, v []byte) error {
b = links.Bucket(bid)
if b == nil {
return nil
}
if err := b.ForEach(func(k, v []byte) error {
parts := bytes.Split(k, []byte("@"))
if len(parts) == 2 {
if string(parts[1]) != id {
return nil
}
var l solver.CacheInfoLink
if err := json.Unmarshal(parts[0], &l); err != nil {
return err
}
l.Digest = digest.FromBytes([]byte(fmt.Sprintf("%s@%d", l.Digest, l.Output)))
l.Output = 0
outIDs = append(outIDs, string(bid))
outLinks = append(outLinks, l)
}
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
for i := range outIDs {
if err := fn(outIDs[i], outLinks[i]); err != nil {
return err
}
}
return nil
}
func isEmptyBucket(b *bolt.Bucket) bool {
if b == nil {
return true
}
k, _ := b.Cursor().First()
return k == nil
}

View File

@ -1,565 +0,0 @@
package solver
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"github.com/moby/buildkit/identity"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type internalMemoryKeyT string
var internalMemoryKey = internalMemoryKeyT("buildkit/memory-cache-id")
var NoSelector = digest.FromBytes(nil)
func NewInMemoryCacheManager() CacheManager {
return NewCacheManager(identity.NewID(), NewInMemoryCacheStorage(), NewInMemoryResultStorage())
}
func NewCacheManager(id string, storage CacheKeyStorage, results CacheResultStorage) CacheManager {
cm := &inMemoryCacheManager{
id: id,
backend: storage,
results: results,
}
storage.Walk(func(id string) error {
return storage.WalkResults(id, func(cr CacheResult) error {
if !results.Exists(cr.ID) {
storage.Release(cr.ID)
}
return nil
})
})
return cm
}
type inMemoryCacheKey struct {
manager *inMemoryCacheManager
cacheResult CacheResult
deps []CacheKeyWithSelector // only []*inMemoryCacheKey
digest digest.Digest
output Index
id string
CacheKey
}
func (ck *inMemoryCacheKey) Deps() []CacheKeyWithSelector {
return ck.deps
}
func (ck *inMemoryCacheKey) Digest() digest.Digest {
return ck.digest
}
func (ck *inMemoryCacheKey) Output() Index {
return ck.output
}
func withExporter(ck *inMemoryCacheKey, cacheResult *CacheResult, deps []CacheKeyWithSelector) ExportableCacheKey {
return ExportableCacheKey{ck, &cacheExporter{
inMemoryCacheKey: ck,
cacheResult: cacheResult,
deps: deps,
}}
}
type cacheExporter struct {
*inMemoryCacheKey
cacheResult *CacheResult
deps []CacheKeyWithSelector
}
func (ce *cacheExporter) Export(ctx context.Context, m map[digest.Digest]*ExportRecord, converter func(context.Context, Result) (*Remote, error)) (*ExportRecord, error) {
var res Result
if ce.cacheResult == nil {
cr, err := ce.inMemoryCacheKey.manager.getBestResult(ce.inMemoryCacheKey.id)
if err != nil {
return nil, err
}
ce.cacheResult = cr
}
var remote *Remote
var err error
if ce.cacheResult != nil {
remote, err = ce.inMemoryCacheKey.manager.results.LoadRemote(ctx, *ce.cacheResult)
if err != nil {
return nil, err
}
if remote == nil {
res, err = ce.inMemoryCacheKey.manager.results.Load(ctx, *ce.cacheResult)
if err != nil {
return nil, err
}
}
}
if res != nil && remote == nil {
remote, err = converter(ctx, res)
if err != nil {
return nil, err
}
}
cacheID := digest.FromBytes([]byte(ce.inMemoryCacheKey.id))
if remote != nil && len(remote.Descriptors) > 0 && remote.Descriptors[0].Digest != "" {
cacheID = remote.Descriptors[0].Digest
}
rec, ok := m[cacheID]
if !ok {
rec = &ExportRecord{
Digest: cacheID,
Remote: remote,
Links: make(map[CacheLink]struct{}),
}
m[cacheID] = rec
}
if len(ce.Deps()) == 0 {
rec.Links[CacheLink{
Output: ce.Output(),
Base: ce.Digest(),
}] = struct{}{}
}
for i, dep := range ce.deps {
if dep.CacheKey.Exporter == nil {
continue
}
r, err := dep.CacheKey.Export(ctx, m, converter)
if err != nil {
return nil, err
}
link := CacheLink{
Source: r.Digest,
Input: Index(i),
Output: ce.Output(),
Base: ce.Digest(),
Selector: dep.Selector,
}
rec.Links[link] = struct{}{}
}
return rec, nil
}
type inMemoryCacheManager struct {
mu sync.RWMutex
id string
backend CacheKeyStorage
results CacheResultStorage
}
func (c *inMemoryCacheManager) ID() string {
return c.id
}
func (c *inMemoryCacheManager) toInMemoryCacheKey(id string, dgst digest.Digest, output Index, deps []CacheKeyWithSelector) *inMemoryCacheKey {
ck := &inMemoryCacheKey{
id: id,
output: output,
digest: dgst,
manager: c,
CacheKey: NewCacheKey("", 0, nil),
deps: deps,
}
ck.SetValue(internalMemoryKey, id)
return ck
}
func (c *inMemoryCacheManager) getBestResult(id string) (*CacheResult, error) {
var results []*CacheResult
if err := c.backend.WalkResults(id, func(res CacheResult) error {
results = append(results, &res)
return nil
}); err != nil {
return nil, err
}
sort.Slice(results, func(i, j int) bool {
return results[i].CreatedAt.After(results[j].CreatedAt)
})
if len(results) > 0 {
return results[0], nil
}
return nil, nil
}
func (c *inMemoryCacheManager) Query(deps []CacheKeyWithSelector, input Index, dgst digest.Digest, output Index) ([]*CacheRecord, error) {
c.mu.RLock()
defer c.mu.RUnlock()
type dep struct {
results map[string]struct{}
key CacheKeyWithSelector
internalKey *inMemoryCacheKey
}
formatDeps := func(deps []dep, index int) []CacheKeyWithSelector {
keys := make([]CacheKeyWithSelector, index+1)
if len(deps) == 1 {
keys[index] = deps[0].key
} else {
k2 := make([]CacheKeyWithSelector, 0, len(deps))
for _, d := range deps {
k2 = append(k2, d.key)
}
keys[index] = CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey("", 0, k2)}}
}
return keys
}
allDeps := make([]dep, 0, len(deps))
for _, d := range deps {
for _, k := range c.getAllKeys(d) {
d := dep{key: k, results: map[string]struct{}{}}
internalKey, err := c.getInternalKey(k.CacheKey, false)
if err != nil {
if errors.Cause(err) == ErrNotFound {
allDeps = append(allDeps, d)
} else {
return nil, err
}
} else {
d.internalKey = internalKey
}
allDeps = append(allDeps, d)
}
}
allRes := map[string]struct{}{}
for _, d := range allDeps {
if d.internalKey != nil {
if err := c.backend.WalkLinks(d.internalKey.id, CacheInfoLink{input, output, dgst, d.key.Selector}, func(id string) error {
d.results[id] = struct{}{}
allRes[id] = struct{}{}
return nil
}); err != nil {
return nil, err
}
}
}
if len(deps) == 0 {
allRes[digest.FromBytes([]byte(fmt.Sprintf("%s@%d", dgst, output))).String()] = struct{}{}
}
outs := make([]*CacheRecord, 0, len(allRes))
for res := range allRes {
for _, d := range allDeps {
if d.internalKey == nil {
internalKey, err := c.getInternalKey(d.key.CacheKey, true)
if err != nil {
return nil, err
}
d.internalKey = internalKey
}
if _, ok := d.results[res]; !ok {
if err := c.backend.AddLink(d.internalKey.id, CacheInfoLink{
Input: input,
Output: output,
Digest: dgst,
Selector: d.key.Selector,
}, res); err != nil {
return nil, err
}
}
}
hadResults := false
fdeps := formatDeps(allDeps, int(input))
k := c.toInMemoryCacheKey(res, dgst, output, fdeps)
// TODO: invoke this only once per input
if err := c.backend.WalkResults(res, func(r CacheResult) error {
if c.results.Exists(r.ID) {
outs = append(outs, &CacheRecord{
ID: res + "@" + r.ID,
CacheKey: withExporter(k, &r, fdeps),
CacheManager: c,
Loadable: true,
CreatedAt: r.CreatedAt,
})
hadResults = true
} else {
c.backend.Release(r.ID)
}
return nil
}); err != nil {
return nil, err
}
if !hadResults {
if len(deps) == 0 {
if !c.backend.Exists(res) {
continue
}
}
outs = append(outs, &CacheRecord{
ID: res,
CacheKey: withExporter(k, nil, fdeps),
CacheManager: c,
Loadable: false,
})
}
}
return outs, nil
}
func (c *inMemoryCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
c.mu.RLock()
defer c.mu.RUnlock()
keyParts := strings.Split(rec.ID, "@")
if len(keyParts) != 2 {
return nil, errors.Errorf("invalid cache record ID")
}
ck, err := c.getInternalKey(rec.CacheKey, false)
if err != nil {
return nil, err
}
res, err := c.backend.Load(ck.id, keyParts[1])
if err != nil {
return nil, err
}
return c.results.Load(ctx, res)
}
func (c *inMemoryCacheManager) Save(k CacheKey, r Result) (ExportableCacheKey, error) {
c.mu.Lock()
defer c.mu.Unlock()
empty := ExportableCacheKey{}
ck, err := c.getInternalKey(k, true)
if err != nil {
return empty, err
}
res, err := c.results.Save(r)
if err != nil {
return empty, err
}
if err := c.backend.AddResult(ck.id, res); err != nil {
return empty, err
}
return withExporter(ck, &res, ck.Deps()), nil
}
func (c *inMemoryCacheManager) getInternalKeys(d CacheKeyWithSelector, createIfNotExist bool) ([]CacheKeyWithSelector, error) {
keys := make([]CacheKeyWithSelector, 0, 1)
if d.CacheKey.Digest() == "" {
for _, d := range d.CacheKey.Deps() {
k, err := c.getInternalKey(d.CacheKey, createIfNotExist)
if err != nil {
if !createIfNotExist && errors.Cause(err) == ErrNotFound {
continue
}
return nil, err
}
keys = append(keys, CacheKeyWithSelector{Selector: d.Selector, CacheKey: ExportableCacheKey{CacheKey: k, Exporter: d.CacheKey.Exporter}})
}
} else {
k, err := c.getInternalKey(d.CacheKey, createIfNotExist)
if err != nil {
return nil, err
}
keys = append(keys, CacheKeyWithSelector{Selector: d.Selector, CacheKey: ExportableCacheKey{CacheKey: k, Exporter: d.CacheKey.Exporter}})
}
return keys, nil
}
func (c *inMemoryCacheManager) getAllKeys(d CacheKeyWithSelector) []CacheKeyWithSelector {
keys := make([]CacheKeyWithSelector, 0, 1)
if d.CacheKey.Digest() == "" {
for _, d := range d.CacheKey.Deps() {
keys = append(keys, d)
}
} else {
keys = append(keys, d)
}
return keys
}
func (c *inMemoryCacheManager) getInternalKey(k CacheKey, createIfNotExist bool) (*inMemoryCacheKey, error) {
if ck, ok := k.(ExportableCacheKey); ok {
k = ck.CacheKey
}
if ck, ok := k.(*inMemoryCacheKey); ok {
return ck, nil
}
internalV := k.GetValue(internalMemoryKey)
if internalV != nil {
return c.toInMemoryCacheKey(internalV.(string), k.Digest(), k.Output(), k.Deps()), nil
}
matches := make(map[string]struct{})
deps := make([][]CacheKeyWithSelector, 0, len(k.Deps()))
for i, inp := range k.Deps() {
allKeys := c.getAllKeys(inp)
cks := make([]CacheKeyWithSelector, 0, len(allKeys))
for _, k := range allKeys {
internalKey, err := c.getInternalKey(k.CacheKey, createIfNotExist)
if err == nil {
cks = append(cks, CacheKeyWithSelector{Selector: k.Selector, CacheKey: ExportableCacheKey{CacheKey: internalKey, Exporter: k.CacheKey.Exporter}})
}
}
if len(cks) == 0 {
return nil, errors.WithStack(ErrNotFound)
}
if i == 0 || len(matches) > 0 {
for _, ck := range cks {
internalCk := ck.CacheKey.CacheKey.(*inMemoryCacheKey)
m2 := make(map[string]struct{})
if err := c.backend.WalkLinks(internalCk.id, CacheInfoLink{
Input: Index(i),
Output: Index(k.Output()),
Digest: k.Digest(),
Selector: ck.Selector,
}, func(id string) error {
if i == 0 {
matches[id] = struct{}{}
} else {
m2[id] = struct{}{}
}
return nil
}); err != nil {
return nil, err
}
if i != 0 {
for id := range matches {
if _, ok := m2[id]; !ok {
delete(matches, id)
}
}
}
}
}
deps = append(deps, cks)
}
var internalKey string
if len(matches) == 0 && len(k.Deps()) > 0 {
if createIfNotExist {
internalKey = identity.NewID()
} else {
return nil, errors.WithStack(ErrNotFound)
}
} else {
for k := range matches {
internalKey = k
break
}
if len(k.Deps()) == 0 {
internalKey = digest.FromBytes([]byte(fmt.Sprintf("%s@%d", k.Digest(), k.Output()))).String()
}
return c.toInMemoryCacheKey(internalKey, k.Digest(), k.Output(), k.Deps()), nil
}
for i, dep := range deps {
for _, ck := range dep {
internalCk := ck.CacheKey.CacheKey.(*inMemoryCacheKey)
err := c.backend.AddLink(internalCk.id, CacheInfoLink{
Input: Index(i),
Output: k.Output(),
Digest: k.Digest(),
Selector: ck.Selector,
}, internalKey)
if err != nil {
return nil, err
}
}
}
return c.toInMemoryCacheKey(internalKey, k.Digest(), k.Output(), k.Deps()), nil
}
func newCombinedCacheManager(cms []CacheManager, main CacheManager) CacheManager {
return &combinedCacheManager{cms: cms, main: main}
}
type combinedCacheManager struct {
cms []CacheManager
main CacheManager
id string
idOnce sync.Once
}
func (cm *combinedCacheManager) ID() string {
cm.idOnce.Do(func() {
ids := make([]string, len(cm.cms))
for i, c := range cm.cms {
ids[i] = c.ID()
}
cm.id = digest.FromBytes([]byte(strings.Join(ids, ","))).String()
})
return cm.id
}
func (cm *combinedCacheManager) Query(inp []CacheKeyWithSelector, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheRecord, error) {
eg, _ := errgroup.WithContext(context.TODO())
res := make(map[string]*CacheRecord, len(cm.cms))
var mu sync.Mutex
for i, c := range cm.cms {
func(i int, c CacheManager) {
eg.Go(func() error {
recs, err := c.Query(inp, inputIndex, dgst, outputIndex)
if err != nil {
return err
}
mu.Lock()
for _, r := range recs {
if _, ok := res[r.ID]; !ok || c == cm.main {
r.CacheManager = c
if c == cm.main {
r.Priority = 1
}
res[r.ID] = r
}
}
mu.Unlock()
return nil
})
}(i, c)
}
if err := eg.Wait(); err != nil {
return nil, err
}
out := make([]*CacheRecord, 0, len(res))
for _, r := range res {
out = append(out, r)
}
return out, nil
}
func (cm *combinedCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
return rec.CacheManager.Load(ctx, rec)
}
func (cm *combinedCacheManager) Save(key CacheKey, s Result) (ExportableCacheKey, error) {
return cm.main.Save(key, s)
}

View File

@ -9,23 +9,46 @@ import (
"github.com/stretchr/testify/require"
)
func depKeys(cks ...CacheKey) []CacheKeyWithSelector {
func depKeys(cks ...ExportableCacheKey) []CacheKeyWithSelector {
var keys []CacheKeyWithSelector
for _, ck := range cks {
keys = append(keys, CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: ck}})
keys = append(keys, CacheKeyWithSelector{CacheKey: ck})
}
return keys
}
func testCacheKey(dgst digest.Digest, output Index, deps ...ExportableCacheKey) *CacheKey {
k := NewCacheKey(dgst, output)
k.deps = make([][]CacheKeyWithSelector, len(deps))
for i, dep := range deps {
k.deps[i] = depKeys(dep)
}
return k
}
func testCacheKeyWithDeps(dgst digest.Digest, output Index, deps [][]CacheKeyWithSelector) *CacheKey {
k := NewCacheKey(dgst, output)
k.deps = deps
return k
}
func expKey(k *CacheKey) ExportableCacheKey {
return ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
}
func TestInMemoryCache(t *testing.T) {
ctx := context.TODO()
m := NewInMemoryCacheManager()
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("result0"))
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), testResult("result0"))
require.NoError(t, err)
matches, err := m.Query(nil, 0, dgst("foo"), 0)
keys, err := m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
@ -34,10 +57,14 @@ func TestInMemoryCache(t *testing.T) {
require.Equal(t, "result0", unwrap(res))
// another record
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0, nil), testResult("result1"))
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0), testResult("result1"))
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("bar"), 0)
keys, err = m.Query(nil, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
@ -46,30 +73,32 @@ func TestInMemoryCache(t *testing.T) {
require.Equal(t, "result1", unwrap(res))
// invalid request
matches, err = m.Query(nil, 0, dgst("baz"), 0)
keys, err = m.Query(nil, 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
// second level
k := NewCacheKey(dgst("baz"), Index(1), []CacheKeyWithSelector{
{CacheKey: cacheFoo}, {CacheKey: cacheBar},
})
k := testCacheKey(dgst("baz"), Index(1), *cacheFoo, *cacheBar)
cacheBaz, err := m.Save(k, testResult("result2"))
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("baz"), 0)
keys, err = m.Query(nil, 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("baz"), 0)
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query(depKeys(cacheFoo), 1, dgst("baz"), Index(1))
keys, err = m.Query(depKeys(*cacheFoo), 1, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("baz"), Index(1))
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
@ -77,49 +106,47 @@ func TestInMemoryCache(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "result2", unwrap(res))
matches2, err := m.Query(depKeys(cacheBar), 1, dgst("baz"), Index(1))
keys2, err := m.Query(depKeys(*cacheBar), 1, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(matches2), 1)
require.Equal(t, len(keys2), 1)
require.Equal(t, matches[0].ID, matches2[0].ID)
require.Equal(t, keys[0].ID, keys2[0].ID)
k = NewCacheKey(dgst("baz"), Index(1), []CacheKeyWithSelector{
{CacheKey: cacheFoo},
})
k = testCacheKey(dgst("baz"), Index(1), *cacheFoo)
_, err = m.Save(k, testResult("result3"))
require.NoError(t, err)
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("baz"), Index(1))
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 2)
// combination save
k2 := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo}, {CacheKey: cacheBaz},
})
k = NewCacheKey(dgst("bax"), 0, []CacheKeyWithSelector{
{CacheKey: ExportableCacheKey{CacheKey: k2}}, {CacheKey: cacheBar},
k = testCacheKeyWithDeps(dgst("bax"), 0, [][]CacheKeyWithSelector{
{{CacheKey: *cacheFoo}, {CacheKey: *cacheBaz}},
{{CacheKey: *cacheBar}},
})
_, err = m.Save(k, testResult("result4"))
require.NoError(t, err)
// foo, bar, baz should all point to result4
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("bax"), 0)
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, len(keys), 1)
id := matches[0].ID
id := keys[0].ID
matches, err = m.Query(depKeys(cacheBar), 1, dgst("bax"), 0)
keys, err = m.Query(depKeys(*cacheBar), 1, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, matches[0].ID, id)
require.Equal(t, len(keys), 1)
require.Equal(t, keys[0].ID, id)
matches, err = m.Query(depKeys(cacheBaz), 0, dgst("bax"), 0)
keys, err = m.Query(depKeys(*cacheBaz), 0, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, matches[0].ID, id)
require.Equal(t, len(keys), 1)
require.Equal(t, keys[0].ID, id)
}
func TestInMemoryCacheSelector(t *testing.T) {
@ -127,23 +154,27 @@ func TestInMemoryCacheSelector(t *testing.T) {
m := NewInMemoryCacheManager()
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("result0"))
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), testResult("result0"))
require.NoError(t, err)
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo, Selector: dgst("sel0")},
_, err = m.Save(testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
{{CacheKey: *cacheFoo, Selector: dgst("sel0")}},
}), testResult("result1"))
require.NoError(t, err)
matches, err := m.Query(depKeys(cacheFoo), 0, dgst("bar"), 0)
keys, err := m.Query(depKeys(*cacheFoo), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query([]CacheKeyWithSelector{{Selector: "sel-invalid", CacheKey: ExportableCacheKey{CacheKey: cacheFoo}}}, 0, dgst("bar"), 0)
keys, err = m.Query([]CacheKeyWithSelector{{Selector: "sel-invalid", CacheKey: *cacheFoo}}, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: ExportableCacheKey{CacheKey: cacheFoo}}}, 0, dgst("bar"), 0)
keys, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: *cacheFoo}}, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
@ -157,44 +188,51 @@ func TestInMemoryCacheSelectorNested(t *testing.T) {
m := NewInMemoryCacheManager()
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("result0"))
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), testResult("result0"))
require.NoError(t, err)
k2 := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo, Selector: dgst("sel0")},
{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey(dgst("second"), 0, nil)}},
})
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: ExportableCacheKey{CacheKey: k2}},
_, err = m.Save(testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
{{CacheKey: *cacheFoo, Selector: dgst("sel0")}, {CacheKey: expKey(NewCacheKey(dgst("second"), 0))}},
}), testResult("result1"))
require.NoError(t, err)
matches, err := m.Query([]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: ExportableCacheKey{CacheKey: cacheFoo}}}, 0, dgst("bar"), 0)
keys, err := m.Query(
[]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: *cacheFoo}},
0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
res, err := m.Load(ctx, matches[0])
require.NoError(t, err)
require.Equal(t, "result1", unwrap(res))
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("bar"), 0)
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("bar"), CacheKey: ExportableCacheKey{CacheKey: cacheFoo}}}, 0, dgst("bar"), 0)
keys, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("bar"), CacheKey: *cacheFoo}}, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query(depKeys(NewCacheKey(dgst("second"), 0, nil)), 0, dgst("bar"), 0)
keys, err = m.Query(depKeys(expKey(NewCacheKey(dgst("second"), 0))), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
res, err = m.Load(ctx, matches[0])
require.NoError(t, err)
require.Equal(t, "result1", unwrap(res))
matches, err = m.Query(depKeys(NewCacheKey(dgst("second"), 0, nil)), 0, dgst("bar"), 0)
keys, err = m.Query(depKeys(expKey(NewCacheKey(dgst("second"), 0))), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, len(keys), 1)
}
func TestInMemoryCacheReleaseParent(t *testing.T) {
@ -203,44 +241,48 @@ func TestInMemoryCacheReleaseParent(t *testing.T) {
m := NewCacheManager(identity.NewID(), storage, results)
res0 := testResult("result0")
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0)
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), res0)
require.NoError(t, err)
res1 := testResult("result1")
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo},
}), res1)
_, err = m.Save(testCacheKey(dgst("bar"), 0, *cacheFoo), res1)
require.NoError(t, err)
matches, err := m.Query(nil, 0, dgst("foo"), 0)
keys, err := m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
err = storage.Release(res0.ID())
require.NoError(t, err)
// foo becomes unloadable
matches, err = m.Query(nil, 0, dgst("foo"), 0)
keys, err = m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 0)
keys, err = m.Query(depKeys(expKey(keys[0])), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.False(t, matches[0].Loadable)
matches, err = m.Query(depKeys(matches[0].CacheKey), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
// releasing bar releases both foo and bar
err = storage.Release(res1.ID())
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("foo"), 0)
keys, err = m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
}
// TestInMemoryCacheRestoreOfflineDeletion deletes a result while the
@ -251,13 +293,11 @@ func TestInMemoryCacheRestoreOfflineDeletion(t *testing.T) {
m := NewCacheManager(identity.NewID(), storage, results)
res0 := testResult("result0")
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0)
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), res0)
require.NoError(t, err)
res1 := testResult("result1")
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo},
}), res1)
_, err = m.Save(testCacheKey(dgst("bar"), 0, *cacheFoo), res1)
require.NoError(t, err)
results2 := NewInMemoryResultStorage()
@ -266,16 +306,21 @@ func TestInMemoryCacheRestoreOfflineDeletion(t *testing.T) {
m = NewCacheManager(identity.NewID(), storage, results2)
matches, err := m.Query(nil, 0, dgst("foo"), 0)
keys, err := m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 0)
keys, err = m.Query(depKeys(expKey(keys[0])), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.False(t, matches[0].Loadable)
matches, err = m.Query(depKeys(matches[0].CacheKey), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
}
func TestCarryOverFromSublink(t *testing.T) {
@ -283,32 +328,29 @@ func TestCarryOverFromSublink(t *testing.T) {
results := NewInMemoryResultStorage()
m := NewCacheManager(identity.NewID(), storage, results)
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("resultFoo"))
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), testResult("resultFoo"))
require.NoError(t, err)
k := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo, Selector: dgst("sel0")},
{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey(dgst("content0"), 0, nil)}, Selector: NoSelector},
})
_, err = m.Save(NewCacheKey(dgst("res"), 0, []CacheKeyWithSelector{{CacheKey: ExportableCacheKey{CacheKey: k}}}), testResult("result0"))
_, err = m.Save(testCacheKeyWithDeps(dgst("res"), 0, [][]CacheKeyWithSelector{
{{CacheKey: *cacheFoo, Selector: dgst("sel0")}, {CacheKey: expKey(NewCacheKey(dgst("content0"), 0))}},
}), testResult("result0"))
require.NoError(t, err)
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0, nil), testResult("resultBar"))
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0), testResult("resultBar"))
require.NoError(t, err)
k3 := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheBar, Selector: dgst("sel0")},
{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey(dgst("content0"), 0, nil)}, Selector: NoSelector},
})
matches, err := m.Query(depKeys(k3), 0, dgst("res"), 0)
keys, err := m.Query([]CacheKeyWithSelector{
{CacheKey: *cacheBar, Selector: dgst("sel0")},
{CacheKey: expKey(NewCacheKey(dgst("content0"), 0))},
}, 0, dgst("res"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, len(keys), 1)
matches, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: ExportableCacheKey{CacheKey: cacheBar}}}, 0, dgst("res"), 0)
keys, err = m.Query([]CacheKeyWithSelector{
{Selector: dgst("sel0"), CacheKey: *cacheBar},
}, 0, dgst("res"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, len(keys), 1)
}
func dgst(s string) digest.Digest {

66
solver-next/cachekey.go Normal file
View File

@ -0,0 +1,66 @@
package solver
import (
"sync"
digest "github.com/opencontainers/go-digest"
)
// NewCacheKey creates a new cache key for a specific output index
func NewCacheKey(dgst digest.Digest, output Index) *CacheKey {
return &CacheKey{
ID: rootKey(dgst, output).String(),
digest: dgst,
output: output,
ids: map[*cacheManager]string{},
}
}
// CacheKeyWithSelector combines a cache key with an optional selector digest.
// Used to limit the matches for dependency cache key.
type CacheKeyWithSelector struct {
Selector digest.Digest
CacheKey ExportableCacheKey
}
type CacheKey struct {
mu sync.RWMutex
ID string
deps [][]CacheKeyWithSelector // only [][]*inMemoryCacheKey
digest digest.Digest
output Index
ids map[*cacheManager]string
indexIDs []string
}
func (ck *CacheKey) Deps() [][]CacheKeyWithSelector {
ck.mu.RLock()
defer ck.mu.RUnlock()
deps := make([][]CacheKeyWithSelector, len(ck.deps))
for i := range ck.deps {
deps[i] = append([]CacheKeyWithSelector(nil), ck.deps[i]...)
}
return deps
}
func (ck *CacheKey) Digest() digest.Digest {
return ck.digest
}
func (ck *CacheKey) Output() Index {
return ck.output
}
func (ck *CacheKey) clone() *CacheKey {
nk := &CacheKey{
ID: ck.ID,
digest: ck.digest,
output: ck.output,
ids: map[*cacheManager]string{},
}
for cm, id := range ck.ids {
nk.ids[cm] = id
}
return nk
}

270
solver-next/cachemanager.go Normal file
View File

@ -0,0 +1,270 @@
package solver
import (
"context"
"fmt"
"sync"
"github.com/moby/buildkit/identity"
digest "github.com/opencontainers/go-digest"
)
type CacheID string
func NewInMemoryCacheManager() CacheManager {
return NewCacheManager(identity.NewID(), NewInMemoryCacheStorage(), NewInMemoryResultStorage())
}
func NewCacheManager(id string, storage CacheKeyStorage, results CacheResultStorage) CacheManager {
cm := &cacheManager{
id: id,
backend: storage,
results: results,
}
storage.Walk(func(id string) error {
return storage.WalkResults(id, func(cr CacheResult) error {
if !results.Exists(cr.ID) {
storage.Release(cr.ID)
}
return nil
})
})
return cm
}
type cacheManager struct {
mu sync.RWMutex
id string
backend CacheKeyStorage
results CacheResultStorage
}
func (c *cacheManager) ID() string {
return c.id
}
func (c *cacheManager) Query(deps []CacheKeyWithSelector, input Index, dgst digest.Digest, output Index) ([]*CacheKey, error) {
c.mu.RLock()
defer c.mu.RUnlock()
type dep struct {
results map[string]struct{}
key CacheKeyWithSelector
}
allDeps := make([]dep, 0, len(deps))
for _, k := range deps {
allDeps = append(allDeps, dep{key: k, results: map[string]struct{}{}})
}
allRes := map[string]*CacheKey{}
for _, d := range allDeps {
if err := c.backend.WalkLinks(c.getID(d.key.CacheKey.CacheKey), CacheInfoLink{input, output, dgst, d.key.Selector}, func(id string) error {
d.results[id] = struct{}{}
if _, ok := allRes[id]; !ok {
allRes[id] = c.newKeyWithID(id, dgst, output)
}
return nil
}); err != nil {
return nil, err
}
}
// link the results against the keys that didn't exist
for id, key := range allRes {
for _, d := range allDeps {
if _, ok := d.results[id]; !ok {
if err := c.backend.AddLink(c.getID(d.key.CacheKey.CacheKey), CacheInfoLink{
Input: input,
Output: output,
Digest: dgst,
Selector: d.key.Selector,
}, c.getID(key)); err != nil {
return nil, err
}
}
}
}
if len(deps) == 0 {
if !c.backend.Exists(rootKey(dgst, output).String()) {
return nil, nil
}
return []*CacheKey{c.newRootKey(dgst, output)}, nil
}
keys := make([]*CacheKey, 0, len(deps))
for _, k := range allRes {
keys = append(keys, k)
}
return keys, nil
}
func (c *cacheManager) Records(ck *CacheKey) ([]*CacheRecord, error) {
outs := make([]*CacheRecord, 0)
if err := c.backend.WalkResults(c.getID(ck), func(r CacheResult) error {
if c.results.Exists(r.ID) {
outs = append(outs, &CacheRecord{
ID: r.ID,
cacheManager: c,
key: ck,
CreatedAt: r.CreatedAt,
})
} else {
c.backend.Release(r.ID)
}
return nil
}); err != nil {
return nil, err
}
return outs, nil
}
func (c *cacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
c.mu.RLock()
defer c.mu.RUnlock()
res, err := c.backend.Load(c.getID(rec.key), rec.ID)
if err != nil {
return nil, err
}
return c.results.Load(ctx, res)
}
func (c *cacheManager) Save(k *CacheKey, r Result) (*ExportableCacheKey, error) {
c.mu.Lock()
defer c.mu.Unlock()
res, err := c.results.Save(r)
if err != nil {
return nil, err
}
if err := c.backend.AddResult(c.getID(k), res); err != nil {
return nil, err
}
if err := c.ensurePersistentKey(k); err != nil {
return nil, err
}
rec := &CacheRecord{
ID: res.ID,
cacheManager: c,
key: k,
CreatedAt: res.CreatedAt,
}
return &ExportableCacheKey{
CacheKey: k,
Exporter: &exporter{k: k, record: rec},
}, nil
}
func newKey() *CacheKey {
return &CacheKey{ids: map[*cacheManager]string{}}
}
func (c *cacheManager) newKeyWithID(id string, dgst digest.Digest, output Index) *CacheKey {
k := newKey()
k.digest = dgst
k.output = output
k.ID = id
k.ids[c] = id
return k
}
func (c *cacheManager) newRootKey(dgst digest.Digest, output Index) *CacheKey {
return c.newKeyWithID(rootKey(dgst, output).String(), dgst, output)
}
func (c *cacheManager) getID(k *CacheKey) string {
k.mu.Lock()
id, ok := k.ids[c]
if ok {
k.mu.Unlock()
return id
}
if len(k.deps) == 0 {
k.ids[c] = k.ID
k.mu.Unlock()
return k.ID
}
id = c.getIDFromDeps(k)
k.ids[c] = id
k.mu.Unlock()
return id
}
func (c *cacheManager) ensurePersistentKey(k *CacheKey) error {
id := c.getID(k)
for i, deps := range k.Deps() {
for _, ck := range deps {
l := CacheInfoLink{
Input: Index(i),
Output: Index(k.Output()),
Digest: k.Digest(),
Selector: ck.Selector,
}
ckID := c.getID(ck.CacheKey.CacheKey)
if !c.backend.HasLink(ckID, l, id) {
if err := c.ensurePersistentKey(ck.CacheKey.CacheKey); err != nil {
return err
}
if err := c.backend.AddLink(ckID, l, id); err != nil {
return err
}
}
}
}
return nil
}
func (c *cacheManager) getIDFromDeps(k *CacheKey) string {
matches := map[string]struct{}{}
for i, deps := range k.deps {
if i == 0 || len(matches) > 0 {
for _, ck := range deps {
m2 := make(map[string]struct{})
if err := c.backend.WalkLinks(c.getID(ck.CacheKey.CacheKey), CacheInfoLink{
Input: Index(i),
Output: Index(k.Output()),
Digest: k.Digest(),
Selector: ck.Selector,
}, func(id string) error {
if i == 0 {
matches[id] = struct{}{}
} else {
m2[id] = struct{}{}
}
return nil
}); err != nil {
matches = map[string]struct{}{}
break
}
if i != 0 {
for id := range matches {
if _, ok := m2[id]; !ok {
delete(matches, id)
}
}
}
}
}
}
for k := range matches {
return k
}
return identity.NewID()
}
func rootKey(dgst digest.Digest, output Index) digest.Digest {
return digest.FromBytes([]byte(fmt.Sprintf("%s@%d", dgst, output)))
}

View File

@ -22,11 +22,12 @@ type CacheKeyStorage interface {
AddLink(id string, link CacheInfoLink, target string) error
WalkLinks(id string, link CacheInfoLink, fn func(id string) error) error
HasLink(id string, link CacheInfoLink, target string) bool
WalkBacklinks(id string, fn func(id string, link CacheInfoLink) error) error
}
// CacheResult is a record for a single solve result
type CacheResult struct {
// Payload []byte
CreatedAt time.Time
ID string
}

View File

@ -18,6 +18,7 @@ func RunCacheStorageTests(t *testing.T, st func() (CacheKeyStorage, func())) {
testLinks,
testResultReleaseSingleLevel,
testResultReleaseMultiLevel,
testBacklinks,
} {
runStorageTest(t, tc, st)
}
@ -201,6 +202,38 @@ func testResultReleaseSingleLevel(t *testing.T, st CacheKeyStorage) {
require.Equal(t, len(m), 0)
}
func testBacklinks(t *testing.T, st CacheKeyStorage) {
t.Parallel()
err := st.AddResult("foo", CacheResult{
ID: "foo-result",
CreatedAt: time.Now(),
})
require.NoError(t, err)
err = st.AddResult("sub0", CacheResult{
ID: "sub0-result",
CreatedAt: time.Now(),
})
require.NoError(t, err)
l0 := CacheInfoLink{
Input: 0, Output: 1, Digest: digest.FromBytes([]byte("to-sub0")),
}
err = st.AddLink("foo", l0, "sub0")
require.NoError(t, err)
backlinks := 0
st.WalkBacklinks("sub0", func(id string, link CacheInfoLink) error {
require.Equal(t, id, "foo")
require.Equal(t, link.Input, Index(0))
require.Equal(t, link.Digest, rootKey(digest.FromBytes([]byte("to-sub0")), 1))
backlinks++
return nil
})
require.Equal(t, backlinks, 1)
}
func testResultReleaseMultiLevel(t *testing.T, st CacheKeyStorage) {
t.Parallel()

View File

@ -0,0 +1,124 @@
package solver
import (
"context"
"strings"
"sync"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
func newCombinedCacheManager(cms []CacheManager, main CacheManager) CacheManager {
return &combinedCacheManager{cms: cms, main: main}
}
type combinedCacheManager struct {
cms []CacheManager
main CacheManager
id string
idOnce sync.Once
}
func (cm *combinedCacheManager) ID() string {
cm.idOnce.Do(func() {
ids := make([]string, len(cm.cms))
for i, c := range cm.cms {
ids[i] = c.ID()
}
cm.id = digest.FromBytes([]byte(strings.Join(ids, ","))).String()
})
return cm.id
}
func (cm *combinedCacheManager) Query(inp []CacheKeyWithSelector, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheKey, error) {
eg, _ := errgroup.WithContext(context.TODO())
keys := make(map[string]*CacheKey, len(cm.cms))
var mu sync.Mutex
for _, c := range cm.cms {
func(c CacheManager) {
eg.Go(func() error {
recs, err := c.Query(inp, inputIndex, dgst, outputIndex)
if err != nil {
return err
}
mu.Lock()
for _, r := range recs {
if _, ok := keys[r.ID]; !ok || c == cm.main {
keys[r.ID] = r
}
}
mu.Unlock()
return nil
})
}(c)
}
if err := eg.Wait(); err != nil {
return nil, err
}
out := make([]*CacheKey, 0, len(keys))
for _, k := range keys {
out = append(out, k)
}
return out, nil
}
func (cm *combinedCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
res, err := rec.cacheManager.Load(ctx, rec)
if err != nil {
return nil, err
}
if _, err := cm.main.Save(rec.key, res); err != nil {
return nil, err
}
return res, nil
}
func (cm *combinedCacheManager) Save(key *CacheKey, s Result) (*ExportableCacheKey, error) {
return cm.main.Save(key, s)
}
func (cm *combinedCacheManager) Records(ck *CacheKey) ([]*CacheRecord, error) {
if len(ck.ids) == 0 {
return nil, errors.Errorf("no results")
}
records := map[string]*CacheRecord{}
var mu sync.Mutex
eg, _ := errgroup.WithContext(context.TODO())
for c := range ck.ids {
func(c *cacheManager) {
eg.Go(func() error {
recs, err := c.Records(ck)
if err != nil {
return err
}
mu.Lock()
for _, rec := range recs {
if _, ok := records[rec.ID]; !ok || c == cm.main {
if c == cm.main {
rec.Priority = 1
}
records[rec.ID] = rec
}
}
mu.Unlock()
return nil
})
}(c)
}
if err := eg.Wait(); err != nil {
return nil, err
}
out := make([]*CacheRecord, 0, len(records))
for _, rec := range records {
out = append(out, rec)
}
return out, nil
}

View File

@ -28,6 +28,7 @@ func newEdge(ed Edge, op activeOp, index *EdgeIndex) *edge {
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]*CacheKey{},
cacheRecords: map[string]*CacheRecord{},
index: index,
}
@ -46,6 +47,7 @@ type edge struct {
execReq pipe.Receiver
err error
cacheRecords map[string]*CacheRecord
keyMap map[string]*CacheKey
noCacheMatchPossible bool
allDepsCompletedCacheFast bool
@ -57,6 +59,8 @@ type edge struct {
releaserCount int
keysDidChange bool
index *EdgeIndex
secondaryExporters []expDep
}
// dep holds state for a dependant edge
@ -64,7 +68,7 @@ type dep struct {
req pipe.Receiver
edgeState
index Index
cacheRecords map[string]*CacheRecord
keyMap map[string]*CacheKey
desiredState edgeStatusType
e *edge
slowCacheReq pipe.Receiver // TODO: reuse req
@ -74,8 +78,13 @@ type dep struct {
err error
}
type expDep struct {
index int
cacheKey CacheKeyWithSelector
}
func newDep(i Index) *dep {
return &dep{index: i, cacheRecords: map[string]*CacheRecord{}}
return &dep{index: i, keyMap: map[string]*CacheKey{}}
}
type edgePipe struct {
@ -123,21 +132,24 @@ func (e *edge) release() {
}
// commitOptions returns parameters for the op execution
func (e *edge) commitOptions() (CacheKey, []CachedResult) {
func (e *edge) commitOptions() (*CacheKey, []CachedResult) {
k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
if e.deps == nil {
return NewCacheKey(e.cacheMap.Digest, e.edge.Index, nil), nil
return k, nil
}
inputs := make([]CacheKeyWithSelector, len(e.deps))
inputs := make([][]CacheKeyWithSelector, len(e.deps))
results := make([]CachedResult, len(e.deps))
for i, dep := range e.deps {
inputs[i] = CacheKeyWithSelector{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector}
inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector})
if dep.slowCacheKey != nil {
inputs[i] = CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
}
results[i] = dep.result
}
return NewCacheKey(e.cacheMap.Digest, e.edge.Index, inputs), results
k.deps = inputs
return k, results
}
// isComplete returns true if edge state is final and will never change
@ -164,23 +176,25 @@ func (e *edge) updateIncoming(req pipe.Sender) {
// probeCache is called with unprocessed cache keys for dependency
// if the key could match the edge, the cacheRecords for dependency are filled
func (e *edge) probeCache(d *dep, keys []CacheKeyWithSelector) bool {
if len(keys) == 0 {
func (e *edge) probeCache(d *dep, depKeys []CacheKeyWithSelector) bool {
if len(depKeys) == 0 {
return false
}
if e.op.IgnoreCache() {
return false
}
records, err := e.op.Cache().Query(keys, d.index, e.cacheMap.Digest, e.edge.Index)
keys, err := e.op.Cache().Query(depKeys, d.index, e.cacheMap.Digest, e.edge.Index)
if err != nil {
e.err = errors.Wrap(err, "error on cache query")
}
for _, r := range records {
if _, ok := d.cacheRecords[r.ID]; !ok {
d.cacheRecords[r.ID] = r
found := false
for _, k := range keys {
if _, ok := d.keyMap[k.ID]; !ok {
d.keyMap[k.ID] = k
found = true
}
}
return len(records) > 0
return found
}
// checkDepMatchPossible checks if any cache matches are possible past this point
@ -206,7 +220,7 @@ func (e *edge) allDepsHaveKeys() bool {
return false
}
for _, d := range e.deps {
if len(d.keys) == 0 && d.slowCacheKey == nil {
if len(d.keys) == 0 && d.slowCacheKey == nil && d.result == nil {
return false
}
}
@ -214,20 +228,31 @@ func (e *edge) allDepsHaveKeys() bool {
}
// depKeys returns all current dependency cache keys
func (e *edge) depKeys() [][]CacheKey {
keys := make([][]CacheKey, len(e.deps))
func (e *edge) currentIndexKey() *CacheKey {
if e.cacheMap == nil {
return nil
}
keys := make([][]CacheKeyWithSelector, len(e.deps))
for i, d := range e.deps {
if len(d.keys) == 0 && d.result == nil {
return nil
}
for _, k := range d.keys {
keys[i] = append(keys[i], k.CacheKey)
keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
}
if d.result != nil {
keys[i] = append(keys[i], d.result.CacheKey())
}
if d.slowCacheKey != nil {
keys[i] = append(keys[i], d.slowCacheKey)
keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: d.result.CacheKey()})
if d.slowCacheKey != nil {
keys[i] = append(keys[i], CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: d.slowCacheKey.CacheKey, Exporter: &exporter{k: d.slowCacheKey.CacheKey}}})
}
}
}
return keys
k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
k.deps = keys
return k
}
// slow cache keys can be computed in 2 phases if there are multiple deps.
@ -235,13 +260,13 @@ func (e *edge) depKeys() [][]CacheKey {
func (e *edge) skipPhase2SlowCache(dep *dep) bool {
isPhase1 := false
for _, dep := range e.deps {
if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.cacheRecords) == 0 {
if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) == 0 {
isPhase1 = true
break
}
}
if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.cacheRecords) > 0 {
if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) > 0 {
return true
}
return false
@ -250,13 +275,13 @@ func (e *edge) skipPhase2SlowCache(dep *dep) bool {
func (e *edge) skipPhase2FastCache(dep *dep) bool {
isPhase1 := false
for _, dep := range e.deps {
if e.cacheMap == nil || len(dep.cacheRecords) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
if e.cacheMap == nil || len(dep.keyMap) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
isPhase1 = true
break
}
}
if isPhase1 && len(dep.cacheRecords) > 0 {
if isPhase1 && len(dep.keyMap) > 0 {
return true
}
return false
@ -317,6 +342,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW
return out
}
func (e *edge) makeExportable(k *CacheKey, records []*CacheRecord) ExportableCacheKey {
return ExportableCacheKey{
CacheKey: k,
Exporter: &exporter{k: k, records: records},
}
}
// processUpdate is called by unpark for every updated pipe request
func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
// response for cachemap request
@ -330,25 +362,30 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
e.cacheMap = upt.Status().Value.(*CacheMap)
if len(e.deps) == 0 {
if !e.op.IgnoreCache() {
records, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
if err != nil {
logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
} else {
for _, r := range records {
if r.Loadable {
for _, k := range keys {
records, err := e.op.Cache().Records(k)
if err != nil {
logrus.Errorf("error receiving cache records: %v", err)
continue
}
for _, r := range records {
e.cacheRecords[r.ID] = r
}
}
if len(records) > 0 {
e.keys = append(e.keys, records[0].CacheKey)
e.keys = append(e.keys, e.makeExportable(k, records))
}
}
}
if e.allDepsHaveKeys() {
e.keysDidChange = true
}
e.state = edgeStatusCacheSlow
}
if e.allDepsHaveKeys() {
e.keysDidChange = true
}
// probe keys that were loaded before cache map
for i, dep := range e.deps {
e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector))
@ -418,13 +455,11 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
e.err = upt.Status().Err
}
} else if !dep.slowCacheComplete {
k := NewCacheKey(upt.Status().Value.(digest.Digest), -1, nil)
ck := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector},
{CacheKey: ExportableCacheKey{CacheKey: k, Exporter: &emptyExporter{k}}, Selector: NoSelector},
})
dep.slowCacheKey = &ExportableCacheKey{ck, &emptyExporter{ck}}
dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{{CacheKey: ExportableCacheKey{CacheKey: *dep.slowCacheKey}}})
k := NewCacheKey(upt.Status().Value.(digest.Digest), -1)
dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
defKeyExp := CacheKeyWithSelector{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector}
dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{defKeyExp, slowKeyExp})
dep.slowCacheComplete = true
e.keysDidChange = true
e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
@ -440,39 +475,56 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
// the state of dependencies has changed
func (e *edge) recalcCurrentState() {
// TODO: fast pass to detect incomplete results
newRecords := map[string]*CacheRecord{}
newKeys := map[string]*CacheKey{}
for i, dep := range e.deps {
if i == 0 {
for key, r := range dep.cacheRecords {
if _, ok := e.cacheRecords[key]; ok {
for id, k := range dep.keyMap {
if _, ok := e.keyMap[id]; ok {
continue
}
newRecords[key] = r
newKeys[id] = k
}
} else {
for key := range newRecords {
if _, ok := dep.cacheRecords[key]; !ok {
delete(newRecords, key)
for id := range newKeys {
if _, ok := dep.keyMap[id]; !ok {
delete(newKeys, id)
}
}
}
if len(newRecords) == 0 {
if len(newKeys) == 0 {
break
}
}
for k, r := range newRecords {
mergedKey := r.CacheKey
if len(e.deps) > 0 {
mergedKey = toMergedCacheKey(e.deps, k)
for _, r := range newKeys {
// TODO: add all deps automatically
mergedKey := r.clone()
mergedKey.deps = make([][]CacheKeyWithSelector, len(e.deps))
for i, dep := range e.deps {
if dep.result != nil {
mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: dep.result.CacheKey()})
if dep.slowCacheKey != nil {
mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
}
} else {
for _, k := range dep.keys {
mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
}
}
}
e.keys = append(e.keys, mergedKey)
if r.Loadable {
r2 := r
r2.CacheKey = mergedKey
e.cacheRecords[k] = r2
records, err := e.op.Cache().Records(mergedKey)
if err != nil {
logrus.Errorf("error receiving cache records: %v", err)
continue
}
for _, r := range records {
e.cacheRecords[r.ID] = r
}
e.keys = append(e.keys, e.makeExportable(mergedKey, records))
}
// detect lower/upper bound for current state
@ -486,7 +538,7 @@ func (e *edge) recalcCurrentState() {
for _, dep := range e.deps {
isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
if dep.state > stLow && len(dep.cacheRecords) == 0 && !isSlowIncomplete {
if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete {
stLow = dep.state
if stLow > edgeStatusCacheSlow {
stLow = edgeStatusCacheSlow
@ -714,28 +766,26 @@ func (e *edge) postpone(f *pipeFactory) {
// loadCache creates a request to load edge result from cache
func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
var rec *CacheRecord
recs := make([]*CacheRecord, 0, len(e.cacheRecords))
for _, r := range e.cacheRecords {
if !r.Loadable {
continue
}
if rec == nil || rec.CreatedAt.Before(r.CreatedAt) || (rec.CreatedAt.Equal(r.CreatedAt) && rec.Priority < r.Priority) {
rec = r
}
recs = append(recs, r)
}
rec := getBestResult(recs)
logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
res, err := e.op.Cache().Load(ctx, rec)
res, err := e.op.LoadCache(ctx, rec)
if err != nil {
return nil, err
}
return NewCachedResult(res, rec.CacheKey, rec.CacheKey), nil
return NewCachedResult(res, ExportableCacheKey{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}), nil
}
// execOp creates a request to execute the vertex operation
func (e *edge) execOp(ctx context.Context) (interface{}, error) {
cacheKey, inputs := e.commitOptions()
results, err := e.op.Exec(ctx, toResultSlice(inputs))
results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs))
if err != nil {
return nil, err
}
@ -756,7 +806,20 @@ func (e *edge) execOp(ctx context.Context) (interface{}, error) {
if err != nil {
return nil, err
}
return NewCachedResult(res, ck, ck), nil
exps := make([]Exporter, 0, len(subExporters))
for _, exp := range subExporters {
exps = append(exps, exp.Exporter)
}
if len(subExporters) > 0 {
ck = &ExportableCacheKey{
CacheKey: ck.CacheKey,
Exporter: &mergedExporter{exporters: append([]Exporter{ck.Exporter}, exps...)},
}
}
return NewCachedResult(res, *ck), nil
}
func toResultSlice(cres []CachedResult) (out []Result) {
@ -766,79 +829,3 @@ func toResultSlice(cres []CachedResult) (out []Result) {
}
return out
}
type emptyExporter struct {
CacheKey
}
func (e *emptyExporter) Export(ctx context.Context, m map[digest.Digest]*ExportRecord, fn func(context.Context, Result) (*Remote, error)) (*ExportRecord, error) {
id := getUniqueID(e.CacheKey)
rec, ok := m[id]
if !ok {
rec = &ExportRecord{Digest: id, Links: map[CacheLink]struct{}{}}
m[id] = rec
}
deps := e.CacheKey.Deps()
for i, dep := range deps {
r, err := dep.CacheKey.Export(ctx, m, fn)
if err != nil {
return nil, err
}
link := CacheLink{
Source: r.Digest,
Input: Index(i),
Output: e.Output(),
Base: e.Digest(),
Selector: dep.Selector,
}
rec.Links[link] = struct{}{}
}
if len(deps) == 0 {
m[id].Links[CacheLink{
Output: e.Output(),
Base: e.Digest(),
}] = struct{}{}
}
return rec, nil
}
type mergedExporter struct {
exporters []Exporter
}
func (e *mergedExporter) Export(ctx context.Context, m map[digest.Digest]*ExportRecord, fn func(context.Context, Result) (*Remote, error)) (er *ExportRecord, err error) {
for _, e := range e.exporters {
er, err = e.Export(ctx, m, fn)
if err != nil {
return nil, err
}
}
return
}
func toMergedCacheKey(deps []*dep, id string) ExportableCacheKey {
depKeys := make([]CacheKeyWithSelector, len(deps))
exporters := make([]Exporter, len(deps))
for i, d := range deps {
depKeys[i] = d.cacheRecords[id].CacheKey.Deps()[i]
exporters[i] = d.cacheRecords[id].CacheKey
}
return ExportableCacheKey{
CacheKey: &mergedCacheKey{CacheKey: deps[0].cacheRecords[id].CacheKey, deps: depKeys},
Exporter: &mergedExporter{exporters: exporters},
}
}
type mergedCacheKey struct {
CacheKey
deps []CacheKeyWithSelector
}
func (ck *mergedCacheKey) Deps() []CacheKeyWithSelector {
return ck.deps
}

179
solver-next/exporter.go Normal file
View File

@ -0,0 +1,179 @@
package solver
import (
"context"
digest "github.com/opencontainers/go-digest"
)
type exporter struct {
k *CacheKey
records []*CacheRecord
record *CacheRecord
res []ExporterRecord
edge *edge // for secondaryExporters
}
func addBacklinks(t ExporterTarget, rec ExporterRecord, cm *cacheManager, id string, bkm map[string]ExporterRecord) (ExporterRecord, error) {
if rec == nil {
var ok bool
rec, ok = bkm[id]
if ok {
return rec, nil
}
_ = ok
}
if err := cm.backend.WalkBacklinks(id, func(id string, link CacheInfoLink) error {
if rec == nil {
rec = t.Add(link.Digest)
}
r, ok := bkm[id]
if !ok {
var err error
r, err = addBacklinks(t, nil, cm, id, bkm)
if err != nil {
return err
}
}
rec.LinkFrom(r, int(link.Input), link.Selector.String())
return nil
}); err != nil {
return nil, err
}
if rec == nil {
rec = t.Add(digest.Digest(id))
}
bkm[id] = rec
return rec, nil
}
type backlinkT struct{}
var backlinkKey = backlinkT{}
func (e *exporter) ExportTo(ctx context.Context, t ExporterTarget, converter func(context.Context, Result) (*Remote, error)) ([]ExporterRecord, error) {
var bkm map[string]ExporterRecord
if bk := ctx.Value(backlinkKey); bk == nil {
bkm = map[string]ExporterRecord{}
ctx = context.WithValue(ctx, backlinkKey, bkm)
} else {
bkm = bk.(map[string]ExporterRecord)
}
if t.Visited(e) {
return e.res, nil
}
deps := e.k.Deps()
type expr struct {
r ExporterRecord
selector digest.Digest
}
srcs := make([][]expr, len(deps))
for i, deps := range deps {
for _, dep := range deps {
recs, err := dep.CacheKey.ExportTo(ctx, t, converter)
if err != nil {
return nil, nil
}
for _, r := range recs {
srcs[i] = append(srcs[i], expr{r: r, selector: dep.Selector})
}
}
}
if e.edge != nil {
for _, de := range e.edge.secondaryExporters {
recs, err := de.cacheKey.CacheKey.ExportTo(ctx, t, converter)
if err != nil {
return nil, nil
}
for _, r := range recs {
srcs[de.index] = append(srcs[de.index], expr{r: r, selector: de.cacheKey.Selector})
}
}
}
rec := t.Add(rootKey(e.k.Digest(), e.k.Output()))
for i, srcs := range srcs {
for _, src := range srcs {
rec.LinkFrom(src.r, i, src.selector.String())
}
}
for cm, id := range e.k.ids {
if _, err := addBacklinks(t, rec, cm, id, bkm); err != nil {
return nil, err
}
}
if e.record == nil && len(e.k.Deps()) > 0 {
e.record = getBestResult(e.records)
}
var remote *Remote
if v := e.record; v != nil && len(e.k.Deps()) > 0 {
cm := v.cacheManager
res, err := cm.backend.Load(cm.getID(v.key), v.ID)
if err != nil {
return nil, err
}
remote, err = cm.results.LoadRemote(ctx, res)
if err != nil {
return nil, err
}
if remote == nil {
res, err := cm.results.Load(ctx, res)
if err != nil {
return nil, err
}
remote, err = converter(ctx, res)
if err != nil {
return nil, err
}
res.Release(context.TODO())
}
if remote != nil {
rec.AddResult(v.CreatedAt, remote)
}
}
e.res = []ExporterRecord{rec}
t.Visit(e)
return e.res, nil
}
func getBestResult(records []*CacheRecord) *CacheRecord {
var rec *CacheRecord
for _, r := range records {
if rec == nil || rec.CreatedAt.Before(r.CreatedAt) || (rec.CreatedAt.Equal(r.CreatedAt) && rec.Priority < r.Priority) {
rec = r
}
}
return rec
}
type mergedExporter struct {
exporters []Exporter
}
func (e *mergedExporter) ExportTo(ctx context.Context, t ExporterTarget, converter func(context.Context, Result) (*Remote, error)) (er []ExporterRecord, err error) {
for _, e := range e.exporters {
r, err := e.ExportTo(ctx, t, converter)
if err != nil {
return nil, err
}
er = append(er, r...)
}
return
}

View File

@ -1,200 +1,243 @@
package solver
import (
"fmt"
"sync"
digest "github.com/opencontainers/go-digest"
"github.com/moby/buildkit/identity"
)
// EdgeIndex is a synchronous map for detecting edge collisions.
type EdgeIndex struct {
mu sync.Mutex
items map[indexedDigest]map[indexedDigest]map[*edge]struct{}
backRefs map[*edge]map[indexedDigest]map[indexedDigest]struct{}
items map[string]*indexItem
backRefs map[*edge]map[string]struct{}
}
type indexItem struct {
edge *edge
links map[CacheInfoLink]map[string]struct{}
deps map[string]struct{}
}
func NewEdgeIndex() *EdgeIndex {
return &EdgeIndex{
items: map[indexedDigest]map[indexedDigest]map[*edge]struct{}{},
backRefs: map[*edge]map[indexedDigest]map[indexedDigest]struct{}{},
items: map[string]*indexItem{},
backRefs: map[*edge]map[string]struct{}{},
}
}
func (ei *EdgeIndex) LoadOrStore(e *edge, dgst digest.Digest, index Index, deps [][]CacheKey) *edge {
ei.mu.Lock()
defer ei.mu.Unlock()
if old := ei.load(e, dgst, index, deps); old != nil && !(!old.edge.Vertex.Options().IgnoreCache && e.edge.Vertex.Options().IgnoreCache) {
return old
}
ei.store(e, dgst, index, deps)
return nil
}
func (ei *EdgeIndex) Release(e *edge) {
ei.mu.Lock()
defer ei.mu.Unlock()
for id, backRefs := range ei.backRefs[e] {
for id2 := range backRefs {
delete(ei.items[id][id2], e)
if len(ei.items[id][id2]) == 0 {
delete(ei.items[id], id2)
}
}
if len(ei.items[id]) == 0 {
delete(ei.items, id)
}
for id := range ei.backRefs[e] {
ei.releaseEdge(id, e)
}
delete(ei.backRefs, e)
}
func (ei *EdgeIndex) load(ignore *edge, dgst digest.Digest, index Index, deps [][]CacheKey) *edge {
id := indexedDigest{dgst: dgst, index: index, depsCount: len(deps)}
m, ok := ei.items[id]
func (ei *EdgeIndex) releaseEdge(id string, e *edge) {
item, ok := ei.items[id]
if !ok {
return nil
}
if len(deps) == 0 {
m2, ok := m[indexedDigest{}]
if !ok {
return nil
}
// prioritize edges with ignoreCache
for e := range m2 {
if e.edge.Vertex.Options().IgnoreCache && e != ignore {
return e
}
}
for e := range m2 {
if e != ignore {
return e
}
}
return nil
}
matches := map[*edge]struct{}{}
for i, keys := range deps {
if i == 0 {
for _, key := range keys {
id := indexedDigest{dgst: getUniqueID(key), index: Index(i)}
for e := range m[id] {
if e != ignore {
matches[e] = struct{}{}
}
}
}
} else {
loop0:
for match := range matches {
for _, key := range keys {
id := indexedDigest{dgst: getUniqueID(key), index: Index(i)}
if m[id] != nil {
if _, ok := m[id][match]; ok {
continue loop0
}
}
}
delete(matches, match)
}
}
if len(matches) == 0 {
break
}
}
// prioritize edges with ignoreCache
for m := range matches {
if m.edge.Vertex.Options().IgnoreCache {
return m
}
}
for m := range matches {
return m
}
return nil
}
func (ei *EdgeIndex) store(e *edge, dgst digest.Digest, index Index, deps [][]CacheKey) {
id := indexedDigest{dgst: dgst, index: index, depsCount: len(deps)}
m, ok := ei.items[id]
if !ok {
m = map[indexedDigest]map[*edge]struct{}{}
ei.items[id] = m
}
backRefsMain, ok := ei.backRefs[e]
if !ok {
backRefsMain = map[indexedDigest]map[indexedDigest]struct{}{}
ei.backRefs[e] = backRefsMain
}
backRefs, ok := backRefsMain[id]
if !ok {
backRefs = map[indexedDigest]struct{}{}
backRefsMain[id] = backRefs
}
if len(deps) == 0 {
m2, ok := m[indexedDigest{}]
if !ok {
m2 = map[*edge]struct{}{}
m[indexedDigest{}] = m2
}
m2[e] = struct{}{}
backRefs[indexedDigest{}] = struct{}{}
return
}
for i, keys := range deps {
for _, key := range keys {
id := indexedDigest{dgst: getUniqueID(key), index: Index(i)}
m2, ok := m[id]
if !ok {
m2 = map[*edge]struct{}{}
m[id] = m2
item.edge = nil
if len(item.links) == 0 {
for d := range item.deps {
ei.releaseLink(d, id)
}
delete(ei.items, id)
}
}
func (ei *EdgeIndex) releaseLink(id, target string) {
item, ok := ei.items[id]
if !ok {
return
}
for lid, links := range item.links {
for check := range links {
if check == target {
delete(links, check)
}
}
if len(links) == 0 {
delete(item.links, lid)
}
}
if item.edge == nil && len(item.links) == 0 {
for d := range item.deps {
ei.releaseLink(d, id)
}
delete(ei.items, id)
}
}
func (ei *EdgeIndex) LoadOrStore(k *CacheKey, e *edge) *edge {
ei.mu.Lock()
defer ei.mu.Unlock()
// get all current edges that match the cachekey
ids := ei.getAllMatches(k)
var oldID string
var old *edge
for _, id := range ids {
if item, ok := ei.items[id]; ok {
if item.edge != e {
oldID = id
old = item.edge
}
}
}
if old != nil && !(!isIgnoreCache(old) && isIgnoreCache(e)) {
ei.enforceLinked(oldID, k)
return old
}
id := identity.NewID()
if len(ids) > 0 {
id = ids[0]
}
ei.enforceLinked(id, k)
ei.items[id].edge = e
backRefs, ok := ei.backRefs[e]
if !ok {
backRefs = map[string]struct{}{}
ei.backRefs[e] = backRefs
}
backRefs[id] = struct{}{}
return nil
}
// enforceLinked adds links from current ID to all dep keys
func (er *EdgeIndex) enforceLinked(id string, k *CacheKey) {
main, ok := er.items[id]
if !ok {
main = &indexItem{
links: map[CacheInfoLink]map[string]struct{}{},
deps: map[string]struct{}{},
}
er.items[id] = main
}
deps := k.Deps()
for i, dd := range deps {
for _, d := range dd {
ck := d.CacheKey.CacheKey
er.enforceIndexID(ck)
ll := CacheInfoLink{Input: Index(i), Digest: k.Digest(), Output: k.Output(), Selector: d.Selector}
for _, ckID := range ck.indexIDs {
if item, ok := er.items[ckID]; ok {
links, ok := item.links[ll]
if !ok {
links = map[string]struct{}{}
item.links[ll] = links
}
links[id] = struct{}{}
main.deps[ckID] = struct{}{}
}
}
m2[e] = struct{}{}
backRefs[id] = struct{}{}
}
}
}
type indexedDigest struct {
dgst digest.Digest
index Index
depsCount int
}
type internalKeyT string
var internalKey = internalKeyT("buildkit/unique-cache-id")
func getUniqueID(k CacheKey) digest.Digest {
internalV := k.GetValue(internalKey)
if internalV != nil {
return internalV.(digest.Digest)
func (ei *EdgeIndex) enforceIndexID(k *CacheKey) {
if len(k.indexIDs) > 0 {
return
}
dgstr := digest.SHA256.Digester()
for _, inp := range k.Deps() {
dgstr.Hash().Write([]byte(getUniqueID(inp.CacheKey)))
dgstr.Hash().Write([]byte(inp.Selector))
matches := ei.getAllMatches(k)
if len(matches) > 0 {
k.indexIDs = matches
} else {
k.indexIDs = []string{identity.NewID()}
}
dgstr.Hash().Write([]byte(k.Digest()))
dgstr.Hash().Write([]byte(fmt.Sprintf("%d", k.Output())))
dgst := dgstr.Digest()
k.SetValue(internalKey, dgst)
return dgst
for _, id := range k.indexIDs {
ei.enforceLinked(id, k)
}
}
func (ei *EdgeIndex) getAllMatches(k *CacheKey) []string {
deps := k.Deps()
if len(deps) == 0 {
return []string{rootKey(k.Digest(), k.Output()).String()}
}
for _, dd := range deps {
for _, k := range dd {
ei.enforceIndexID(k.CacheKey.CacheKey)
}
}
matches := map[string]struct{}{}
for i, dd := range deps {
if i == 0 {
for _, d := range dd {
ll := CacheInfoLink{Input: Index(i), Digest: k.Digest(), Output: k.Output(), Selector: d.Selector}
for _, ckID := range d.CacheKey.CacheKey.indexIDs {
item, ok := ei.items[ckID]
if ok {
for l := range item.links[ll] {
matches[l] = struct{}{}
}
}
}
}
continue
}
if len(matches) == 0 {
break
}
for m := range matches {
found := false
for _, d := range dd {
ll := CacheInfoLink{Input: Index(i), Digest: k.Digest(), Output: k.Output(), Selector: d.Selector}
for _, ckID := range d.CacheKey.CacheKey.indexIDs {
if l, ok := ei.items[ckID].links[ll]; ok {
if _, ok := l[m]; ok {
found = true
break
}
}
}
}
if !found {
delete(matches, m)
}
}
}
out := make([]string, 0, len(matches))
for m := range matches {
out = append(out, m)
}
return out
}
func isIgnoreCache(e *edge) bool {
if e.edge.Vertex == nil {
return false
}
return e.edge.Vertex.Options().IgnoreCache
}

View File

@ -1 +1,211 @@
package solver
import (
"testing"
"github.com/stretchr/testify/require"
)
func checkEmpty(t *testing.T, ei *EdgeIndex) {
require.Equal(t, len(ei.items), 0)
require.Equal(t, len(ei.backRefs), 0)
}
func TestIndexSimple(t *testing.T) {
idx := NewEdgeIndex()
e1 := &edge{}
e2 := &edge{}
e3 := &edge{}
k1 := NewCacheKey(dgst("foo"), 0)
v := idx.LoadOrStore(k1, e1)
require.Nil(t, v)
k2 := NewCacheKey(dgst("bar"), 0)
v = idx.LoadOrStore(k2, e2)
require.Nil(t, v)
v = idx.LoadOrStore(NewCacheKey(dgst("bar"), 0), e3)
require.Equal(t, v, e2)
v = idx.LoadOrStore(NewCacheKey(dgst("bar"), 0), e3)
require.Equal(t, v, e2)
v = idx.LoadOrStore(NewCacheKey(dgst("foo"), 0), e3)
require.Equal(t, v, e1)
idx.Release(e1)
idx.Release(e2)
checkEmpty(t, idx)
}
func TestIndexMultiLevelSimple(t *testing.T) {
idx := NewEdgeIndex()
e1 := &edge{}
e2 := &edge{}
e3 := &edge{}
k1 := testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(NewCacheKey("s0", 0)), Selector: dgst("s0")}},
})
v := idx.LoadOrStore(k1, e1)
require.Nil(t, v)
k2 := testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(NewCacheKey("s0", 0)), Selector: dgst("s0")}},
})
v = idx.LoadOrStore(k2, e2)
require.Equal(t, v, e1)
k2 = testCacheKeyWithDeps(dgst("foo"), 1, k1.Deps())
v = idx.LoadOrStore(k2, e2)
require.Equal(t, v, e1)
v = idx.LoadOrStore(k1, e2)
require.Equal(t, v, e1)
// update selector
k2 = testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(NewCacheKey("s0", 0))}},
})
v = idx.LoadOrStore(k2, e2)
require.Nil(t, v)
// add one dep to e1
k2 = testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{
{CacheKey: expKey(NewCacheKey("s0", 0)), Selector: dgst("s0")},
{CacheKey: expKey(NewCacheKey("s1", 1))},
},
})
v = idx.LoadOrStore(k2, e2)
require.Equal(t, v, e1)
// recheck with only the new dep key
k2 = testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{
{CacheKey: expKey(NewCacheKey("s1", 1))},
},
})
v = idx.LoadOrStore(k2, e2)
require.Equal(t, v, e1)
// combine e1 and e2
k2 = testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{
{CacheKey: expKey(NewCacheKey("s0", 0))},
{CacheKey: expKey(NewCacheKey("s1", 1))},
},
})
v = idx.LoadOrStore(k2, e2)
require.Equal(t, v, e1)
// initial e2 now points to e1
k2 = testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(NewCacheKey("s0", 0))}},
})
v = idx.LoadOrStore(k2, e2)
require.Equal(t, v, e1)
idx.Release(e1)
// e2 still remains after e1 is gone
k2 = testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(NewCacheKey("s0", 0))}},
})
v = idx.LoadOrStore(k2, e3)
require.Equal(t, v, e2)
idx.Release(e2)
checkEmpty(t, idx)
}
func TestIndexThreeLevels(t *testing.T) {
idx := NewEdgeIndex()
e1 := &edge{}
e2 := &edge{}
e3 := &edge{}
k1 := testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(NewCacheKey("s0", 0)), Selector: dgst("s0")}},
})
v := idx.LoadOrStore(k1, e1)
require.Nil(t, v)
v = idx.LoadOrStore(k1, e2)
require.Equal(t, v, e1)
k2 := testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(k1)}},
})
v = idx.LoadOrStore(k2, e2)
require.Nil(t, v)
k2 = testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{
{CacheKey: expKey(k1)},
{CacheKey: expKey(NewCacheKey("alt", 0))},
},
})
v = idx.LoadOrStore(k2, e2)
require.Nil(t, v)
k2 = testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{
{CacheKey: expKey(NewCacheKey("alt", 0))},
},
})
v = idx.LoadOrStore(k2, e3)
require.Equal(t, v, e2)
// change dep in a low key
k1 = testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{
{CacheKey: expKey(NewCacheKey("f0", 0))},
{CacheKey: expKey(NewCacheKey("f0_", 0))},
},
{{CacheKey: expKey(NewCacheKey("s0", 0)), Selector: dgst("s0")}},
})
k2 = testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(k1)}},
})
v = idx.LoadOrStore(k2, e3)
require.Equal(t, v, e2)
// reload with only f0_ still matches
k1 = testCacheKeyWithDeps(dgst("foo"), 1, [][]CacheKeyWithSelector{
{
{CacheKey: expKey(NewCacheKey("f0_", 0))},
},
{{CacheKey: expKey(NewCacheKey("s0", 0)), Selector: dgst("s0")}},
})
k2 = testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
{{CacheKey: expKey(NewCacheKey("f0", 0))}},
{{CacheKey: expKey(k1)}},
})
v = idx.LoadOrStore(k2, e3)
require.Equal(t, v, e2)
idx.Release(e1)
idx.Release(e2)
checkEmpty(t, idx)
}

View File

@ -7,8 +7,11 @@ import (
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/tracing"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
@ -18,6 +21,7 @@ type ResolveOpFunc func(Vertex, Builder) (Op, error)
type Builder interface {
Build(ctx context.Context, e Edge) (CachedResult, error)
Call(ctx context.Context, name string, fn func(ctx context.Context) error) error
}
// JobList provides a shared graph of all the vertexes currently being
@ -58,7 +62,35 @@ type state struct {
jobList *JobList
}
func (s *state) builder() Builder {
func (s *state) getSessionID() string {
// TODO: connect with sessionmanager to avoid getting dropped sessions
s.mu.Lock()
for j := range s.jobs {
if j.SessionID != "" {
s.mu.Unlock()
return j.SessionID
}
}
parents := map[digest.Digest]struct{}{}
for p := range s.parents {
parents[p] = struct{}{}
}
s.mu.Unlock()
for p := range parents {
s.jobList.mu.Lock()
pst, ok := s.jobList.actives[p]
s.jobList.mu.Unlock()
if ok {
if sessionID := pst.getSessionID(); sessionID != "" {
return sessionID
}
}
}
return ""
}
func (s *state) builder() *subBuilder {
return &subBuilder{state: s}
}
@ -113,14 +145,31 @@ func (s *state) Release() {
for _, e := range s.edges {
e.release()
}
if s.op != nil {
s.op.release()
}
}
type subBuilder struct {
*state
mu sync.Mutex
exporters []ExportableCacheKey
}
func (sb *subBuilder) Build(ctx context.Context, e Edge) (CachedResult, error) {
return sb.jobList.subBuild(ctx, e, sb.vtx)
res, err := sb.jobList.subBuild(ctx, e, sb.vtx)
if err != nil {
return nil, err
}
sb.mu.Lock()
sb.exporters = append(sb.exporters, res.CacheKey())
sb.mu.Unlock()
return res, nil
}
func (sb *subBuilder) Call(ctx context.Context, name string, fn func(ctx context.Context) error) error {
ctx = progress.WithProgress(ctx, sb.mpw)
return inVertexContext(ctx, name, fn)
}
type Job struct {
@ -129,6 +178,7 @@ type Job struct {
pw progress.Writer
progressCloser func()
SessionID string
}
type SolverOpt struct {
@ -258,13 +308,13 @@ func (jl *JobList) loadUnlocked(v, parent Vertex, j *Job, cache map[Vertex]Verte
if cache := v.Options().CacheSource; cache != nil && cache.ID() != st.mainCache.ID() {
st.cache[cache.ID()] = cache
}
st.mu.Unlock()
if j != nil {
if _, ok := st.jobs[j]; !ok {
st.jobs[j] = struct{}{}
}
}
st.mu.Unlock()
if parent != nil {
if _, ok := st.parents[parent.Digest()]; !ok {
@ -274,6 +324,10 @@ func (jl *JobList) loadUnlocked(v, parent Vertex, j *Job, cache map[Vertex]Verte
return nil, errors.Errorf("inactive parent %s", parent.Digest())
}
parentState.childVtx[dgst] = struct{}{}
for id, c := range parentState.cache {
st.cache[id] = c
}
}
}
@ -387,8 +441,15 @@ func (j *Job) Discard() error {
return nil
}
func (j *Job) Call(ctx context.Context, name string, fn func(ctx context.Context) error) error {
ctx = progress.WithProgress(ctx, j.pw)
return inVertexContext(ctx, name, fn)
}
type activeOp interface {
Op
CacheMap(context.Context) (*CacheMap, error)
LoadCache(ctx context.Context, rec *CacheRecord) (Result, error)
Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error)
IgnoreCache() bool
Cache() CacheManager
CalcSlowCache(context.Context, Index, ResultBasedCacheFunc, Result) (digest.Digest, error)
@ -404,16 +465,22 @@ func newSharedOp(resolver ResolveOpFunc, cacheManager CacheManager, st *state) *
return so
}
type execRes struct {
execRes []*SharedResult
execExporters []ExportableCacheKey
}
type sharedOp struct {
resolver ResolveOpFunc
st *state
g flightcontrol.Group
opOnce sync.Once
op Op
err error
opOnce sync.Once
op Op
subBuilder *subBuilder
err error
execRes []*SharedResult
execRes *execRes
execErr error
cacheRes *CacheMap
@ -432,6 +499,17 @@ func (s *sharedOp) Cache() CacheManager {
return s.st.combinedCacheManager()
}
func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, error) {
ctx = progress.WithProgress(ctx, s.st.mpw)
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name())
notifyStarted(ctx, &s.st.clientVertex, true)
res, err := s.Cache().Load(ctx, rec)
tracing.FinishWithError(span, err)
notifyCompleted(ctx, &s.st.clientVertex, err, true)
return res, err
}
func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBasedCacheFunc, res Result) (digest.Digest, error) {
key, err := s.g.Do(ctx, fmt.Sprintf("slow-compute-%d", index), func(ctx context.Context) (interface{}, error) {
s.slowMu.Lock()
@ -480,7 +558,7 @@ func (s *sharedOp) CacheMap(ctx context.Context) (*CacheMap, error) {
if err != nil {
return nil, err
}
res, err := s.g.Do(ctx, "cachemap", func(ctx context.Context) (interface{}, error) {
res, err := s.g.Do(ctx, "cachemap", func(ctx context.Context) (ret interface{}, retErr error) {
if s.cacheRes != nil {
return s.cacheRes, nil
}
@ -488,6 +566,16 @@ func (s *sharedOp) CacheMap(ctx context.Context) (*CacheMap, error) {
return nil, s.cacheErr
}
ctx = progress.WithProgress(ctx, s.st.mpw)
ctx = session.NewContext(ctx, s.st.getSessionID())
if len(s.st.vtx.Inputs()) == 0 {
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name())
notifyStarted(ctx, &s.st.clientVertex, false)
defer func() {
tracing.FinishWithError(span, retErr)
notifyCompleted(ctx, &s.st.clientVertex, retErr, false)
}()
}
res, err := op.CacheMap(ctx)
complete := true
if err != nil {
@ -515,16 +603,27 @@ func (s *sharedOp) CacheMap(ctx context.Context) (*CacheMap, error) {
return res.(*CacheMap), nil
}
func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error) {
op, err := s.getOp()
if err != nil {
return nil, err
return nil, nil, err
}
res, err := s.g.Do(ctx, "exec", func(ctx context.Context) (interface{}, error) {
res, err := s.g.Do(ctx, "exec", func(ctx context.Context) (ret interface{}, retErr error) {
if s.execRes != nil || s.execErr != nil {
return s.execRes, s.execErr
}
ctx = progress.WithProgress(ctx, s.st.mpw)
ctx = session.NewContext(ctx, s.st.getSessionID())
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name())
notifyStarted(ctx, &s.st.clientVertex, false)
defer func() {
tracing.FinishWithError(span, retErr)
notifyCompleted(ctx, &s.st.clientVertex, retErr, false)
}()
res, err := op.Exec(ctx, inputs)
complete := true
if err != nil {
@ -540,21 +639,30 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
}
if complete {
if res != nil {
s.execRes = wrapShared(res)
var subExporters []ExportableCacheKey
s.subBuilder.mu.Lock()
if len(s.subBuilder.exporters) > 0 {
subExporters = append(subExporters, s.subBuilder.exporters...)
}
s.subBuilder.mu.Unlock()
s.execRes = &execRes{execRes: wrapShared(res), execExporters: subExporters}
}
s.execErr = err
}
return s.execRes, err
})
if err != nil {
return nil, err
return nil, nil, err
}
return unwrapShared(res.([]*SharedResult)), nil
r := res.(*execRes)
return unwrapShared(r.execRes), r.execExporters, nil
}
func (s *sharedOp) getOp() (Op, error) {
s.opOnce.Do(func() {
s.op, s.err = s.resolver(s.st.vtx, s.st.builder())
s.subBuilder = s.st.builder()
s.op, s.err = s.resolver(s.st.vtx, s.subBuilder)
})
if s.err != nil {
return nil, s.err
@ -564,7 +672,7 @@ func (s *sharedOp) getOp() (Op, error) {
func (s *sharedOp) release() {
if s.execRes != nil {
for _, r := range s.execRes {
for _, r := range s.execRes.execRes {
r.Release(context.TODO())
}
}
@ -611,3 +719,41 @@ func (v *vertexWithCacheOptions) Digest() digest.Digest {
func (v *vertexWithCacheOptions) Inputs() []Edge {
return v.inputs
}
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
v.Started = &now
v.Completed = nil
v.Cached = cached
pw.Write(v.Digest.String(), *v)
}
func notifyCompleted(ctx context.Context, v *client.Vertex, err error, cached bool) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
if v.Started == nil {
v.Started = &now
}
v.Completed = &now
v.Cached = cached
if err != nil {
v.Error = err.Error()
}
pw.Write(v.Digest.String(), *v)
}
func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error {
v := client.Vertex{
Digest: digest.FromBytes([]byte(identity.NewID())),
Name: name,
}
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
notifyStarted(ctx, &v, false)
defer pw.Close()
err := f(ctx)
notifyCompleted(ctx, &v, err, false)
return err
}

View File

@ -32,12 +32,12 @@ func (s *inMemoryStore) Exists(id string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
if k, ok := s.byID[id]; ok {
return len(k.links) > 0
return len(k.links) > 0 || len(k.results) > 0
}
return false
}
func newKey(id string) *inMemoryKey {
func newInMemoryKey(id string) *inMemoryKey {
return &inMemoryKey{
results: map[string]CacheResult{},
links: map[CacheInfoLink]map[string]struct{}{},
@ -103,7 +103,7 @@ func (s *inMemoryStore) AddResult(id string, res CacheResult) error {
defer s.mu.Unlock()
k, ok := s.byID[id]
if !ok {
k = newKey(id)
k = newInMemoryKey(id)
s.byID[id] = k
}
k.results[res.ID] = res
@ -169,12 +169,12 @@ func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) er
defer s.mu.Unlock()
k, ok := s.byID[id]
if !ok {
k = newKey(id)
k = newInMemoryKey(id)
s.byID[id] = k
}
k2, ok := s.byID[target]
if !ok {
k2 = newKey(target)
k2 = newInMemoryKey(target)
s.byID[target] = k2
}
m, ok := k.links[link]
@ -209,6 +209,55 @@ func (s *inMemoryStore) WalkLinks(id string, link CacheInfoLink, fn func(id stri
return nil
}
func (s *inMemoryStore) HasLink(id string, link CacheInfoLink, target string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
if k, ok := s.byID[id]; ok {
if v, ok := k.links[link]; ok {
if _, ok := v[target]; ok {
return true
}
}
}
return false
}
func (s *inMemoryStore) WalkBacklinks(id string, fn func(id string, link CacheInfoLink) error) error {
s.mu.RLock()
k, ok := s.byID[id]
if !ok {
s.mu.RUnlock()
return nil
}
var outIDs []string
var outLinks []CacheInfoLink
for bid := range k.backlinks {
b, ok := s.byID[bid]
if !ok {
continue
}
for l, m := range b.links {
if _, ok := m[id]; !ok {
continue
}
outIDs = append(outIDs, bid)
outLinks = append(outLinks, CacheInfoLink{
Digest: rootKey(l.Digest, l.Output),
Input: l.Input,
Selector: l.Selector,
})
}
}
s.mu.RUnlock()
for i := range outIDs {
if err := fn(outIDs[i], outLinks[i]); err != nil {
return err
}
}
return nil
}
func NewInMemoryResultStorage() CacheResultStorage {
return &inMemoryResultStore{m: &sync.Map{}}
}

View File

@ -18,7 +18,9 @@ func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error {
if enc := vs.encore(); len(enc) > 0 {
ch <- &client.SolveStatus{Vertexes: enc}
}
close(ch)
}()
for {
p, err := pr.Read(ctx)
if err != nil {
@ -76,16 +78,16 @@ type vertexStream struct {
func (vs *vertexStream) append(v client.Vertex) []*client.Vertex {
var out []*client.Vertex
vs.cache[v.Digest] = &v
if v.Cached {
if v.Started != nil {
for _, inp := range v.Inputs {
if inpv, ok := vs.cache[inp]; ok {
if !inpv.Cached && inpv.Completed == nil {
inpv.Cached = true
inpv.Started = v.Completed
inpv.Completed = v.Completed
inpv.Started = v.Started
inpv.Completed = v.Started
out = append(out, vs.append(*inpv)...)
delete(vs.cache, inp)
}
delete(vs.cache, inp)
out = append(out, vs.append(*inpv)...)
}
}
}

View File

@ -5,7 +5,6 @@ import (
"sync"
"sync/atomic"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -59,30 +58,17 @@ func (r *splitResult) Release(ctx context.Context) error {
}
// NewCachedResult combines a result and cache key into cached result
func NewCachedResult(res Result, k CacheKey, exp Exporter) CachedResult {
return &cachedResult{res, k, exp}
func NewCachedResult(res Result, k ExportableCacheKey) CachedResult {
return &cachedResult{res, k}
}
type cachedResult struct {
Result
k CacheKey
exp Exporter
k ExportableCacheKey
}
func (cr *cachedResult) CacheKey() ExportableCacheKey {
return ExportableCacheKey{CacheKey: cr.k, Exporter: cr.exp}
}
func (cr *cachedResult) Export(ctx context.Context, converter func(context.Context, Result) (*Remote, error)) ([]ExportRecord, error) {
m := make(map[digest.Digest]*ExportRecord)
if _, err := cr.exp.Export(ctx, m, converter); err != nil {
return nil, err
}
out := make([]ExportRecord, 0, len(m))
for _, r := range m {
out = append(out, *r)
}
return out, nil
return cr.k
}
func NewSharedCachedResult(res CachedResult) *SharedCachedResult {
@ -113,10 +99,6 @@ func (cr *clonedCachedResult) CacheKey() ExportableCacheKey {
return cr.cr.CacheKey()
}
func (cr *clonedCachedResult) Export(ctx context.Context, converter func(context.Context, Result) (*Remote, error)) ([]ExportRecord, error) {
return cr.cr.Export(ctx, converter)
}
type SharedCachedResult struct {
*SharedResult
CachedResult

View File

@ -141,12 +141,15 @@ func (s *Scheduler) dispatch(e *edge) {
}
// if keys changed there might be possiblity for merge with other edge
if e.keysDidChange && e.cacheMap != nil {
origEdge := e.index.LoadOrStore(e, e.cacheMap.Digest, e.edge.Index, e.depKeys())
if origEdge != nil {
logrus.Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name())
if s.mergeTo(origEdge, e) {
s.ef.SetEdge(e.edge, origEdge)
if e.keysDidChange {
if k := e.currentIndexKey(); k != nil {
// skip this if not at least 1 key per dep
origEdge := e.index.LoadOrStore(k, e)
if origEdge != nil {
logrus.Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name())
if s.mergeTo(origEdge, e) {
s.ef.SetEdge(e.edge, origEdge)
}
}
}
e.keysDidChange = false
@ -278,6 +281,15 @@ func (s *Scheduler) mergeTo(target, src *edge) bool {
delete(s.outgoing, src)
s.signal(target)
for i, d := range src.deps {
for _, k := range d.keys {
target.secondaryExporters = append(target.secondaryExporters, expDep{i, CacheKeyWithSelector{CacheKey: k, Selector: src.cacheMap.Deps[i].Selector}})
}
if d.result != nil {
target.secondaryExporters = append(target.secondaryExporters, expDep{i, CacheKeyWithSelector{CacheKey: d.result.CacheKey(), Selector: src.cacheMap.Deps[i].Selector}})
}
}
// TODO(tonistiigi): merge cache providers
return true

View File

@ -2129,10 +2129,20 @@ func TestCacheExporting(t *testing.T) {
require.NoError(t, j0.Discard())
j0 = nil
rec, err := res.Export(ctx, testConvertToRemote)
expTarget := newTestExporterTarget()
_, err = res.CacheKey().Exporter.ExportTo(ctx, expTarget, testConvertToRemote)
require.NoError(t, err)
require.Equal(t, 3, len(rec))
expTarget.normalize()
require.Equal(t, len(expTarget.records), 3)
require.Equal(t, expTarget.records[0].results, 0)
require.Equal(t, expTarget.records[1].results, 0)
require.Equal(t, expTarget.records[2].results, 1)
require.Equal(t, expTarget.records[0].links, 0)
require.Equal(t, expTarget.records[1].links, 0)
require.Equal(t, expTarget.records[2].links, 2)
j1, err := l.NewJob("j1")
require.NoError(t, err)
@ -2150,10 +2160,20 @@ func TestCacheExporting(t *testing.T) {
require.NoError(t, j1.Discard())
j1 = nil
rec2, err := res.Export(ctx, testConvertToRemote)
expTarget = newTestExporterTarget()
_, err = res.CacheKey().Exporter.ExportTo(ctx, expTarget, testConvertToRemote)
require.NoError(t, err)
require.Equal(t, 3, len(rec2))
expTarget.normalize()
// the order of the records isn't really significant
require.Equal(t, len(expTarget.records), 3)
require.Equal(t, expTarget.records[0].results, 0)
require.Equal(t, expTarget.records[1].results, 0)
require.Equal(t, expTarget.records[2].results, 1)
require.Equal(t, expTarget.records[0].links, 0)
require.Equal(t, expTarget.records[1].links, 0)
require.Equal(t, expTarget.records[2].links, 2)
}
func TestSlowCacheAvoidAccess(t *testing.T) {
@ -2293,13 +2313,21 @@ func TestCacheExportingPartialSelector(t *testing.T) {
require.NoError(t, j0.Discard())
j0 = nil
rec, err := res.Export(ctx, testConvertToRemote)
expTarget := newTestExporterTarget()
_, err = res.CacheKey().Exporter.ExportTo(ctx, expTarget, testConvertToRemote)
require.NoError(t, err)
require.Equal(t, len(rec), 4) // v1 creates 3 records
expTarget.normalize()
require.Equal(t, len(expTarget.records), 3)
require.Equal(t, expTarget.records[0].results, 0)
require.Equal(t, expTarget.records[1].results, 0)
require.Equal(t, expTarget.records[2].results, 1)
require.Equal(t, expTarget.records[0].links, 0)
require.Equal(t, expTarget.records[1].links, 0)
require.Equal(t, expTarget.records[2].links, 2)
// repeat so that all coming from cache are retained
j1, err := l.NewJob("j1")
require.NoError(t, err)
@ -2309,25 +2337,221 @@ func TestCacheExportingPartialSelector(t *testing.T) {
}
}()
g1 := Edge{
Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
inputs: []Edge{g0},
},
),
}
g1 := g0
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result2")
require.Equal(t, unwrap(res), "result0")
require.NoError(t, j1.Discard())
j1 = nil
_, err = res.Export(ctx, testConvertToRemote)
expTarget = newTestExporterTarget()
_, err = res.CacheKey().Exporter.ExportTo(ctx, expTarget, testConvertToRemote)
require.NoError(t, err)
expTarget.normalize()
// the order of the records isn't really significant
require.Equal(t, len(expTarget.records), 3)
require.Equal(t, expTarget.records[0].results, 0)
require.Equal(t, expTarget.records[1].results, 1)
require.Equal(t, expTarget.records[2].results, 0)
require.Equal(t, expTarget.records[0].links, 0)
require.Equal(t, expTarget.records[1].links, 2)
require.Equal(t, expTarget.records[2].links, 0)
// repeat with forcing a slow key recomputation
j2, err := l.NewJob("j2")
require.NoError(t, err)
defer func() {
if j2 != nil {
j2.Discard()
}
}()
g2 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1-net",
value: "result1",
})},
},
selectors: map[int]digest.Digest{
0: dgst("sel0"),
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
},
}),
}
res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.NoError(t, j2.Discard())
j2 = nil
expTarget = newTestExporterTarget()
_, err = res.CacheKey().Exporter.ExportTo(ctx, expTarget, testConvertToRemote)
require.NoError(t, err)
expTarget.normalize()
// the order of the records isn't really significant
// adds one
require.Equal(t, len(expTarget.records), 4)
require.Equal(t, expTarget.records[0].results, 0)
require.Equal(t, expTarget.records[1].results, 0)
require.Equal(t, expTarget.records[2].results, 1)
require.Equal(t, expTarget.records[3].results, 0)
require.Equal(t, expTarget.records[0].links, 0)
require.Equal(t, expTarget.records[1].links, 0)
require.Equal(t, expTarget.records[2].links, 3)
require.Equal(t, expTarget.records[3].links, 0)
// repeat with a wrapper
j3, err := l.NewJob("j3")
require.NoError(t, err)
defer func() {
if j3 != nil {
j3.Discard()
}
}()
g3 := Edge{
Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
inputs: []Edge{g2},
},
),
}
res, err = j3.Build(ctx, g3)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result2")
require.NoError(t, j3.Discard())
j3 = nil
expTarget = newTestExporterTarget()
_, err = res.CacheKey().Exporter.ExportTo(ctx, expTarget, testConvertToRemote)
require.NoError(t, err)
expTarget.normalize()
// adds one extra result
// the order of the records isn't really significant
require.Equal(t, len(expTarget.records), 5)
require.Equal(t, expTarget.records[0].results, 0)
require.Equal(t, expTarget.records[1].results, 0)
require.Equal(t, expTarget.records[2].results, 1)
require.Equal(t, expTarget.records[3].results, 0)
require.Equal(t, expTarget.records[4].results, 1)
require.Equal(t, expTarget.records[0].links, 0)
require.Equal(t, expTarget.records[1].links, 0)
require.Equal(t, expTarget.records[2].links, 3)
require.Equal(t, expTarget.records[3].links, 0)
require.Equal(t, expTarget.records[4].links, 1)
}
func TestCacheExportingMergedKey(t *testing.T) {
t.Parallel()
ctx := context.TODO()
cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())
l := NewJobList(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()
j0, err := l.NewJob("j0")
require.NoError(t, err)
defer func() {
if j0 != nil {
j0.Discard()
}
}()
g0 := Edge{
Vertex: vtx(vtxOpt{
name: "v0",
cacheKeySeed: "seed0",
value: "result0",
inputs: []Edge{
{
Vertex: vtx(vtxOpt{
name: "v1",
cacheKeySeed: "seed1",
value: "result1",
inputs: []Edge{
{
Vertex: vtx(vtxOpt{
name: "v2",
cacheKeySeed: "seed2",
value: "result2",
}),
},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
},
}),
},
{
Vertex: vtx(vtxOpt{
name: "v1-diff",
cacheKeySeed: "seed1",
value: "result1",
inputs: []Edge{
{
Vertex: vtx(vtxOpt{
name: "v3",
cacheKeySeed: "seed3",
value: "result2",
}),
},
},
slowCacheCompute: map[int]ResultBasedCacheFunc{
0: digestFromResult,
},
}),
},
},
}),
}
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.Equal(t, unwrap(res), "result0")
require.NoError(t, j0.Discard())
j0 = nil
expTarget := newTestExporterTarget()
_, err = res.CacheKey().Exporter.ExportTo(ctx, expTarget, testConvertToRemote)
require.NoError(t, err)
expTarget.normalize()
require.Equal(t, len(expTarget.records), 5)
}
func generateSubGraph(nodes int) (Edge, int) {
@ -2548,7 +2772,7 @@ func (v *vertexConst) Exec(ctx context.Context, inputs []Result) (outputs []Resu
// vtxSum returns a vertex that ourputs sum of its inputs plus a constant
func vtxSum(v int, opt vtxOpt) *vertexSum {
if opt.cacheKeySeed == "" {
opt.cacheKeySeed = fmt.Sprintf("sum-%d", v)
opt.cacheKeySeed = fmt.Sprintf("sum-%d-%d", v, len(opt.inputs))
}
if opt.name == "" {
opt.name = opt.cacheKeySeed + "-" + identity.NewID()
@ -2687,7 +2911,75 @@ func digestFromResult(ctx context.Context, res Result) (digest.Digest, error) {
}
func testConvertToRemote(ctx context.Context, res Result) (*Remote, error) {
return &Remote{Descriptors: []ocispec.Descriptor{{
Annotations: map[string]string{"value": fmt.Sprintf("%d", unwrapInt(res))},
}}}, nil
if dr, ok := res.Sys().(*dummyResult); ok {
return &Remote{Descriptors: []ocispec.Descriptor{{
Annotations: map[string]string{"value": fmt.Sprintf("%d", dr.intValue)},
}}}, nil
}
return nil, nil
}
func newTestExporterTarget() *testExporterTarget {
return &testExporterTarget{
visited: map[interface{}]struct{}{},
}
}
type testExporterTarget struct {
visited map[interface{}]struct{}
records []*testExporterRecord
}
func (t *testExporterTarget) Add(dgst digest.Digest) ExporterRecord {
r := &testExporterRecord{dgst: dgst}
t.records = append(t.records, r)
return r
}
func (t *testExporterTarget) Visit(v interface{}) {
t.visited[v] = struct{}{}
}
func (t *testExporterTarget) Visited(v interface{}) bool {
_, ok := t.visited[v]
return ok
}
func (t *testExporterTarget) normalize() {
m := map[digest.Digest]struct{}{}
rec := make([]*testExporterRecord, 0, len(t.records))
for _, r := range t.records {
if _, ok := m[r.dgst]; ok {
for _, r2 := range t.records {
delete(r2.linkMap, r.dgst)
r2.links = len(r2.linkMap)
}
continue
}
m[r.dgst] = struct{}{}
rec = append(rec, r)
}
t.records = rec
}
type testExporterRecord struct {
dgst digest.Digest
results int
links int
linkMap map[digest.Digest]struct{}
}
func (r *testExporterRecord) AddResult(createdAt time.Time, result *Remote) {
r.results++
}
func (r *testExporterRecord) LinkFrom(src ExporterRecord, index int, selector string) {
if s, ok := src.(*testExporterRecord); ok {
if r.linkMap == nil {
r.linkMap = map[digest.Digest]struct{}{}
}
if _, ok := r.linkMap[s.dgst]; !ok {
r.linkMap[s.dgst] = struct{}{}
r.links++
}
}
}

View File

@ -2,7 +2,6 @@ package solver
import (
"context"
"sync"
"time"
"github.com/containerd/containerd/content"
@ -51,23 +50,29 @@ type Result interface {
type CachedResult interface {
Result
CacheKey() ExportableCacheKey
Export(ctx context.Context, converter func(context.Context, Result) (*Remote, error)) ([]ExportRecord, error)
}
// Exporter can export the artifacts of the build chain
type Exporter interface {
Export(ctx context.Context, m map[digest.Digest]*ExportRecord, converter func(context.Context, Result) (*Remote, error)) (*ExportRecord, error)
ExportTo(ctx context.Context, t ExporterTarget, converter func(context.Context, Result) (*Remote, error)) ([]ExporterRecord, error)
}
// ExportRecord defines a single record in the exported cache chain
type ExportRecord struct {
Digest digest.Digest
Links map[CacheLink]struct{}
Remote *Remote
// ExporterTarget defines object capable of receiving exports
type ExporterTarget interface {
Add(dgst digest.Digest) ExporterRecord
Visit(interface{})
Visited(interface{}) bool
}
// ExporterRecord is a single object being exported
type ExporterRecord interface {
AddResult(createdAt time.Time, result *Remote)
LinkFrom(src ExporterRecord, index int, selector string)
}
// Remote is a descriptor or a list of stacked descriptors that can be pulled
// from a content provider
// TODO: add closer to keep referenced data from getting deleted
type Remote struct {
Descriptors []ocispec.Descriptor
Provider content.Provider
@ -75,11 +80,11 @@ type Remote struct {
// CacheLink is a link between two cache records
type CacheLink struct {
Source digest.Digest
Input Index
Output Index
Base digest.Digest
Selector digest.Digest
Source digest.Digest `json:",omitempty"`
Input Index `json:",omitempty"`
Output Index `json:",omitempty"`
Base digest.Digest `json:",omitempty"`
Selector digest.Digest `json:",omitempty"`
}
// Op is an implementation for running a vertex
@ -108,32 +113,19 @@ type CacheMap struct {
// ExportableCacheKey is a cache key connected with an exporter that can export
// a chain of cacherecords pointing to that key
type ExportableCacheKey struct {
CacheKey
*CacheKey
Exporter
}
// CacheKey is an identifier for storing/loading build cache
type CacheKey interface {
// Deps are dependant cache keys
Deps() []CacheKeyWithSelector
// Base digest for operation. Usually CacheMap.Digest
Digest() digest.Digest
// Index for the output that is cached
Output() Index
// Helpers for implementations for adding internal metadata
SetValue(key, value interface{})
GetValue(key interface{}) interface{}
}
// CacheRecord is an identifier for loading in cache
type CacheRecord struct {
ID string
CacheKey ExportableCacheKey
CacheManager CacheManager
Loadable bool
// Size int
ID string
Size int
CreatedAt time.Time
Priority int
cacheManager *cacheManager
key *CacheKey
}
// CacheManager implements build cache backend
@ -143,58 +135,10 @@ type CacheManager interface {
ID() string
// Query searches for cache paths from one cache key to the output of a
// possible match.
Query(inp []CacheKeyWithSelector, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheRecord, error)
Query(inp []CacheKeyWithSelector, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheKey, error)
Records(ck *CacheKey) ([]*CacheRecord, error)
// Load pulls and returns the cached result
Load(ctx context.Context, rec *CacheRecord) (Result, error)
// Save saves a result based on a cache key
Save(key CacheKey, s Result) (ExportableCacheKey, error)
}
// NewCacheKey creates a new cache key for a specific output index
func NewCacheKey(dgst digest.Digest, index Index, deps []CacheKeyWithSelector) CacheKey {
return &cacheKey{
dgst: dgst,
deps: deps,
index: index,
values: &sync.Map{},
}
}
// CacheKeyWithSelector combines a cache key with an optional selector digest.
// Used to limit the matches for dependency cache key.
type CacheKeyWithSelector struct {
Selector digest.Digest
CacheKey ExportableCacheKey
}
type cacheKey struct {
dgst digest.Digest
index Index
deps []CacheKeyWithSelector
values *sync.Map
}
func (ck *cacheKey) SetValue(key, value interface{}) {
ck.values.Store(key, value)
}
func (ck *cacheKey) GetValue(key interface{}) interface{} {
v, _ := ck.values.Load(key)
return v
}
func (ck *cacheKey) Deps() []CacheKeyWithSelector {
return ck.deps
}
func (ck *cacheKey) Digest() digest.Digest {
return ck.dgst
}
func (ck *cacheKey) Output() Index {
return ck.index
}
func (ck *cacheKey) Export(ctx context.Context, converter func(context.Context, Result) ([]ocispec.Descriptor, content.Provider, error)) ([]ExportRecord, content.Provider, error) {
return nil, nil, nil
Save(key *CacheKey, s Result) (*ExportableCacheKey, error)
}