solver: remove CacheKey interface

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2018-04-13 13:52:27 -07:00
parent 243f742ac2
commit 32f7a01fe7
12 changed files with 1051 additions and 938 deletions

View File

@ -1,565 +0,0 @@
package solver
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"github.com/moby/buildkit/identity"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type internalMemoryKeyT string
var internalMemoryKey = internalMemoryKeyT("buildkit/memory-cache-id")
var NoSelector = digest.FromBytes(nil)
func NewInMemoryCacheManager() CacheManager {
return NewCacheManager(identity.NewID(), NewInMemoryCacheStorage(), NewInMemoryResultStorage())
}
func NewCacheManager(id string, storage CacheKeyStorage, results CacheResultStorage) CacheManager {
cm := &inMemoryCacheManager{
id: id,
backend: storage,
results: results,
}
storage.Walk(func(id string) error {
return storage.WalkResults(id, func(cr CacheResult) error {
if !results.Exists(cr.ID) {
storage.Release(cr.ID)
}
return nil
})
})
return cm
}
type inMemoryCacheKey struct {
manager *inMemoryCacheManager
cacheResult CacheResult
deps []CacheKeyWithSelector // only []*inMemoryCacheKey
digest digest.Digest
output Index
id string
CacheKey
}
func (ck *inMemoryCacheKey) Deps() []CacheKeyWithSelector {
return ck.deps
}
func (ck *inMemoryCacheKey) Digest() digest.Digest {
return ck.digest
}
func (ck *inMemoryCacheKey) Output() Index {
return ck.output
}
func withExporter(ck *inMemoryCacheKey, cacheResult *CacheResult, deps []CacheKeyWithSelector) ExportableCacheKey {
return ExportableCacheKey{ck, &cacheExporter{
inMemoryCacheKey: ck,
cacheResult: cacheResult,
deps: deps,
}}
}
type cacheExporter struct {
*inMemoryCacheKey
cacheResult *CacheResult
deps []CacheKeyWithSelector
}
func (ce *cacheExporter) Export(ctx context.Context, m map[digest.Digest]*ExportRecord, converter func(context.Context, Result) (*Remote, error)) (*ExportRecord, error) {
var res Result
if ce.cacheResult == nil {
cr, err := ce.inMemoryCacheKey.manager.getBestResult(ce.inMemoryCacheKey.id)
if err != nil {
return nil, err
}
ce.cacheResult = cr
}
var remote *Remote
var err error
if ce.cacheResult != nil {
remote, err = ce.inMemoryCacheKey.manager.results.LoadRemote(ctx, *ce.cacheResult)
if err != nil {
return nil, err
}
if remote == nil {
res, err = ce.inMemoryCacheKey.manager.results.Load(ctx, *ce.cacheResult)
if err != nil {
return nil, err
}
}
}
if res != nil && remote == nil {
remote, err = converter(ctx, res)
if err != nil {
return nil, err
}
}
cacheID := digest.FromBytes([]byte(ce.inMemoryCacheKey.id))
if remote != nil && len(remote.Descriptors) > 0 && remote.Descriptors[0].Digest != "" {
cacheID = remote.Descriptors[0].Digest
}
rec, ok := m[cacheID]
if !ok {
rec = &ExportRecord{
Digest: cacheID,
Remote: remote,
Links: make(map[CacheLink]struct{}),
}
m[cacheID] = rec
}
if len(ce.Deps()) == 0 {
rec.Links[CacheLink{
Output: ce.Output(),
Base: ce.Digest(),
}] = struct{}{}
}
for i, dep := range ce.deps {
if dep.CacheKey.Exporter == nil {
continue
}
r, err := dep.CacheKey.Export(ctx, m, converter)
if err != nil {
return nil, err
}
link := CacheLink{
Source: r.Digest,
Input: Index(i),
Output: ce.Output(),
Base: ce.Digest(),
Selector: dep.Selector,
}
rec.Links[link] = struct{}{}
}
return rec, nil
}
type inMemoryCacheManager struct {
mu sync.RWMutex
id string
backend CacheKeyStorage
results CacheResultStorage
}
func (c *inMemoryCacheManager) ID() string {
return c.id
}
func (c *inMemoryCacheManager) toInMemoryCacheKey(id string, dgst digest.Digest, output Index, deps []CacheKeyWithSelector) *inMemoryCacheKey {
ck := &inMemoryCacheKey{
id: id,
output: output,
digest: dgst,
manager: c,
CacheKey: NewCacheKey("", 0, nil),
deps: deps,
}
ck.SetValue(internalMemoryKey, id)
return ck
}
func (c *inMemoryCacheManager) getBestResult(id string) (*CacheResult, error) {
var results []*CacheResult
if err := c.backend.WalkResults(id, func(res CacheResult) error {
results = append(results, &res)
return nil
}); err != nil {
return nil, err
}
sort.Slice(results, func(i, j int) bool {
return results[i].CreatedAt.After(results[j].CreatedAt)
})
if len(results) > 0 {
return results[0], nil
}
return nil, nil
}
func (c *inMemoryCacheManager) Query(deps []CacheKeyWithSelector, input Index, dgst digest.Digest, output Index) ([]*CacheRecord, error) {
c.mu.RLock()
defer c.mu.RUnlock()
type dep struct {
results map[string]struct{}
key CacheKeyWithSelector
internalKey *inMemoryCacheKey
}
formatDeps := func(deps []dep, index int) []CacheKeyWithSelector {
keys := make([]CacheKeyWithSelector, index+1)
if len(deps) == 1 {
keys[index] = deps[0].key
} else {
k2 := make([]CacheKeyWithSelector, 0, len(deps))
for _, d := range deps {
k2 = append(k2, d.key)
}
keys[index] = CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey("", 0, k2)}}
}
return keys
}
allDeps := make([]dep, 0, len(deps))
for _, d := range deps {
for _, k := range c.getAllKeys(d) {
d := dep{key: k, results: map[string]struct{}{}}
internalKey, err := c.getInternalKey(k.CacheKey, false)
if err != nil {
if errors.Cause(err) == ErrNotFound {
allDeps = append(allDeps, d)
} else {
return nil, err
}
} else {
d.internalKey = internalKey
}
allDeps = append(allDeps, d)
}
}
allRes := map[string]struct{}{}
for _, d := range allDeps {
if d.internalKey != nil {
if err := c.backend.WalkLinks(d.internalKey.id, CacheInfoLink{input, output, dgst, d.key.Selector}, func(id string) error {
d.results[id] = struct{}{}
allRes[id] = struct{}{}
return nil
}); err != nil {
return nil, err
}
}
}
if len(deps) == 0 {
allRes[digest.FromBytes([]byte(fmt.Sprintf("%s@%d", dgst, output))).String()] = struct{}{}
}
outs := make([]*CacheRecord, 0, len(allRes))
for res := range allRes {
for _, d := range allDeps {
if d.internalKey == nil {
internalKey, err := c.getInternalKey(d.key.CacheKey, true)
if err != nil {
return nil, err
}
d.internalKey = internalKey
}
if _, ok := d.results[res]; !ok {
if err := c.backend.AddLink(d.internalKey.id, CacheInfoLink{
Input: input,
Output: output,
Digest: dgst,
Selector: d.key.Selector,
}, res); err != nil {
return nil, err
}
}
}
hadResults := false
fdeps := formatDeps(allDeps, int(input))
k := c.toInMemoryCacheKey(res, dgst, output, fdeps)
// TODO: invoke this only once per input
if err := c.backend.WalkResults(res, func(r CacheResult) error {
if c.results.Exists(r.ID) {
outs = append(outs, &CacheRecord{
ID: res + "@" + r.ID,
CacheKey: withExporter(k, &r, fdeps),
CacheManager: c,
Loadable: true,
CreatedAt: r.CreatedAt,
})
hadResults = true
} else {
c.backend.Release(r.ID)
}
return nil
}); err != nil {
return nil, err
}
if !hadResults {
if len(deps) == 0 {
if !c.backend.Exists(res) {
continue
}
}
outs = append(outs, &CacheRecord{
ID: res,
CacheKey: withExporter(k, nil, fdeps),
CacheManager: c,
Loadable: false,
})
}
}
return outs, nil
}
func (c *inMemoryCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
c.mu.RLock()
defer c.mu.RUnlock()
keyParts := strings.Split(rec.ID, "@")
if len(keyParts) != 2 {
return nil, errors.Errorf("invalid cache record ID")
}
ck, err := c.getInternalKey(rec.CacheKey, false)
if err != nil {
return nil, err
}
res, err := c.backend.Load(ck.id, keyParts[1])
if err != nil {
return nil, err
}
return c.results.Load(ctx, res)
}
func (c *inMemoryCacheManager) Save(k CacheKey, r Result) (ExportableCacheKey, error) {
c.mu.Lock()
defer c.mu.Unlock()
empty := ExportableCacheKey{}
ck, err := c.getInternalKey(k, true)
if err != nil {
return empty, err
}
res, err := c.results.Save(r)
if err != nil {
return empty, err
}
if err := c.backend.AddResult(ck.id, res); err != nil {
return empty, err
}
return withExporter(ck, &res, ck.Deps()), nil
}
func (c *inMemoryCacheManager) getInternalKeys(d CacheKeyWithSelector, createIfNotExist bool) ([]CacheKeyWithSelector, error) {
keys := make([]CacheKeyWithSelector, 0, 1)
if d.CacheKey.Digest() == "" {
for _, d := range d.CacheKey.Deps() {
k, err := c.getInternalKey(d.CacheKey, createIfNotExist)
if err != nil {
if !createIfNotExist && errors.Cause(err) == ErrNotFound {
continue
}
return nil, err
}
keys = append(keys, CacheKeyWithSelector{Selector: d.Selector, CacheKey: ExportableCacheKey{CacheKey: k, Exporter: d.CacheKey.Exporter}})
}
} else {
k, err := c.getInternalKey(d.CacheKey, createIfNotExist)
if err != nil {
return nil, err
}
keys = append(keys, CacheKeyWithSelector{Selector: d.Selector, CacheKey: ExportableCacheKey{CacheKey: k, Exporter: d.CacheKey.Exporter}})
}
return keys, nil
}
func (c *inMemoryCacheManager) getAllKeys(d CacheKeyWithSelector) []CacheKeyWithSelector {
keys := make([]CacheKeyWithSelector, 0, 1)
if d.CacheKey.Digest() == "" {
for _, d := range d.CacheKey.Deps() {
keys = append(keys, d)
}
} else {
keys = append(keys, d)
}
return keys
}
func (c *inMemoryCacheManager) getInternalKey(k CacheKey, createIfNotExist bool) (*inMemoryCacheKey, error) {
if ck, ok := k.(ExportableCacheKey); ok {
k = ck.CacheKey
}
if ck, ok := k.(*inMemoryCacheKey); ok {
return ck, nil
}
internalV := k.GetValue(internalMemoryKey)
if internalV != nil {
return c.toInMemoryCacheKey(internalV.(string), k.Digest(), k.Output(), k.Deps()), nil
}
matches := make(map[string]struct{})
deps := make([][]CacheKeyWithSelector, 0, len(k.Deps()))
for i, inp := range k.Deps() {
allKeys := c.getAllKeys(inp)
cks := make([]CacheKeyWithSelector, 0, len(allKeys))
for _, k := range allKeys {
internalKey, err := c.getInternalKey(k.CacheKey, createIfNotExist)
if err == nil {
cks = append(cks, CacheKeyWithSelector{Selector: k.Selector, CacheKey: ExportableCacheKey{CacheKey: internalKey, Exporter: k.CacheKey.Exporter}})
}
}
if len(cks) == 0 {
return nil, errors.WithStack(ErrNotFound)
}
if i == 0 || len(matches) > 0 {
for _, ck := range cks {
internalCk := ck.CacheKey.CacheKey.(*inMemoryCacheKey)
m2 := make(map[string]struct{})
if err := c.backend.WalkLinks(internalCk.id, CacheInfoLink{
Input: Index(i),
Output: Index(k.Output()),
Digest: k.Digest(),
Selector: ck.Selector,
}, func(id string) error {
if i == 0 {
matches[id] = struct{}{}
} else {
m2[id] = struct{}{}
}
return nil
}); err != nil {
return nil, err
}
if i != 0 {
for id := range matches {
if _, ok := m2[id]; !ok {
delete(matches, id)
}
}
}
}
}
deps = append(deps, cks)
}
var internalKey string
if len(matches) == 0 && len(k.Deps()) > 0 {
if createIfNotExist {
internalKey = identity.NewID()
} else {
return nil, errors.WithStack(ErrNotFound)
}
} else {
for k := range matches {
internalKey = k
break
}
if len(k.Deps()) == 0 {
internalKey = digest.FromBytes([]byte(fmt.Sprintf("%s@%d", k.Digest(), k.Output()))).String()
}
return c.toInMemoryCacheKey(internalKey, k.Digest(), k.Output(), k.Deps()), nil
}
for i, dep := range deps {
for _, ck := range dep {
internalCk := ck.CacheKey.CacheKey.(*inMemoryCacheKey)
err := c.backend.AddLink(internalCk.id, CacheInfoLink{
Input: Index(i),
Output: k.Output(),
Digest: k.Digest(),
Selector: ck.Selector,
}, internalKey)
if err != nil {
return nil, err
}
}
}
return c.toInMemoryCacheKey(internalKey, k.Digest(), k.Output(), k.Deps()), nil
}
func newCombinedCacheManager(cms []CacheManager, main CacheManager) CacheManager {
return &combinedCacheManager{cms: cms, main: main}
}
type combinedCacheManager struct {
cms []CacheManager
main CacheManager
id string
idOnce sync.Once
}
func (cm *combinedCacheManager) ID() string {
cm.idOnce.Do(func() {
ids := make([]string, len(cm.cms))
for i, c := range cm.cms {
ids[i] = c.ID()
}
cm.id = digest.FromBytes([]byte(strings.Join(ids, ","))).String()
})
return cm.id
}
func (cm *combinedCacheManager) Query(inp []CacheKeyWithSelector, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheRecord, error) {
eg, _ := errgroup.WithContext(context.TODO())
res := make(map[string]*CacheRecord, len(cm.cms))
var mu sync.Mutex
for i, c := range cm.cms {
func(i int, c CacheManager) {
eg.Go(func() error {
recs, err := c.Query(inp, inputIndex, dgst, outputIndex)
if err != nil {
return err
}
mu.Lock()
for _, r := range recs {
if _, ok := res[r.ID]; !ok || c == cm.main {
r.CacheManager = c
if c == cm.main {
r.Priority = 1
}
res[r.ID] = r
}
}
mu.Unlock()
return nil
})
}(i, c)
}
if err := eg.Wait(); err != nil {
return nil, err
}
out := make([]*CacheRecord, 0, len(res))
for _, r := range res {
out = append(out, r)
}
return out, nil
}
func (cm *combinedCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
return rec.CacheManager.Load(ctx, rec)
}
func (cm *combinedCacheManager) Save(key CacheKey, s Result) (ExportableCacheKey, error) {
return cm.main.Save(key, s)
}

View File

@ -9,23 +9,47 @@ import (
"github.com/stretchr/testify/require"
)
func depKeys(cks ...CacheKey) []CacheKeyWithSelector {
func depKeys(cks ...ExportableCacheKey) []CacheKeyWithSelector {
var keys []CacheKeyWithSelector
for _, ck := range cks {
keys = append(keys, CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: ck}})
keys = append(keys, CacheKeyWithSelector{CacheKey: ck})
}
return keys
}
func testCacheKey(dgst digest.Digest, output Index, deps ...ExportableCacheKey) *CacheKey {
k := NewCacheKey(dgst, output)
k.deps = make([][]CacheKeyWithSelector, len(deps))
for i, dep := range deps {
k.deps[i] = depKeys(dep)
// k.deps[i] = append(k.deps[i], CacheKeyWithSelector{CacheKey: dep})
}
return k
}
func testCacheKeyWithDeps(dgst digest.Digest, output Index, deps [][]CacheKeyWithSelector) *CacheKey {
k := NewCacheKey(dgst, output)
k.deps = deps
return k
}
func expKey(k *CacheKey) ExportableCacheKey {
return ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
}
func TestInMemoryCache(t *testing.T) {
ctx := context.TODO()
m := NewInMemoryCacheManager()
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("result0"))
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), testResult("result0"))
require.NoError(t, err)
matches, err := m.Query(nil, 0, dgst("foo"), 0)
keys, err := m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
@ -34,10 +58,14 @@ func TestInMemoryCache(t *testing.T) {
require.Equal(t, "result0", unwrap(res))
// another record
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0, nil), testResult("result1"))
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0), testResult("result1"))
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("bar"), 0)
keys, err = m.Query(nil, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
@ -46,30 +74,32 @@ func TestInMemoryCache(t *testing.T) {
require.Equal(t, "result1", unwrap(res))
// invalid request
matches, err = m.Query(nil, 0, dgst("baz"), 0)
keys, err = m.Query(nil, 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
// second level
k := NewCacheKey(dgst("baz"), Index(1), []CacheKeyWithSelector{
{CacheKey: cacheFoo}, {CacheKey: cacheBar},
})
k := testCacheKey(dgst("baz"), Index(1), *cacheFoo, *cacheBar)
cacheBaz, err := m.Save(k, testResult("result2"))
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("baz"), 0)
keys, err = m.Query(nil, 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("baz"), 0)
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("baz"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query(depKeys(cacheFoo), 1, dgst("baz"), Index(1))
keys, err = m.Query(depKeys(*cacheFoo), 1, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("baz"), Index(1))
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
@ -77,49 +107,47 @@ func TestInMemoryCache(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "result2", unwrap(res))
matches2, err := m.Query(depKeys(cacheBar), 1, dgst("baz"), Index(1))
keys2, err := m.Query(depKeys(*cacheBar), 1, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(matches2), 1)
require.Equal(t, len(keys2), 1)
require.Equal(t, matches[0].ID, matches2[0].ID)
require.Equal(t, keys[0].ID, keys2[0].ID)
k = NewCacheKey(dgst("baz"), Index(1), []CacheKeyWithSelector{
{CacheKey: cacheFoo},
})
k = testCacheKey(dgst("baz"), Index(1), *cacheFoo)
_, err = m.Save(k, testResult("result3"))
require.NoError(t, err)
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("baz"), Index(1))
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("baz"), Index(1))
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 2)
// combination save
k2 := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo}, {CacheKey: cacheBaz},
})
k = NewCacheKey(dgst("bax"), 0, []CacheKeyWithSelector{
{CacheKey: ExportableCacheKey{CacheKey: k2}}, {CacheKey: cacheBar},
k = testCacheKeyWithDeps(dgst("bax"), 0, [][]CacheKeyWithSelector{
[]CacheKeyWithSelector{{CacheKey: *cacheFoo}, {CacheKey: *cacheBaz}},
[]CacheKeyWithSelector{{CacheKey: *cacheBar}},
})
_, err = m.Save(k, testResult("result4"))
require.NoError(t, err)
// foo, bar, baz should all point to result4
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("bax"), 0)
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, len(keys), 1)
id := matches[0].ID
id := keys[0].ID
matches, err = m.Query(depKeys(cacheBar), 1, dgst("bax"), 0)
keys, err = m.Query(depKeys(*cacheBar), 1, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, matches[0].ID, id)
require.Equal(t, len(keys), 1)
require.Equal(t, keys[0].ID, id)
matches, err = m.Query(depKeys(cacheBaz), 0, dgst("bax"), 0)
keys, err = m.Query(depKeys(*cacheBaz), 0, dgst("bax"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, matches[0].ID, id)
require.Equal(t, len(keys), 1)
require.Equal(t, keys[0].ID, id)
}
func TestInMemoryCacheSelector(t *testing.T) {
@ -127,23 +155,27 @@ func TestInMemoryCacheSelector(t *testing.T) {
m := NewInMemoryCacheManager()
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("result0"))
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), testResult("result0"))
require.NoError(t, err)
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo, Selector: dgst("sel0")},
_, err = m.Save(testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
[]CacheKeyWithSelector{{CacheKey: *cacheFoo, Selector: dgst("sel0")}},
}), testResult("result1"))
require.NoError(t, err)
matches, err := m.Query(depKeys(cacheFoo), 0, dgst("bar"), 0)
keys, err := m.Query(depKeys(*cacheFoo), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query([]CacheKeyWithSelector{{Selector: "sel-invalid", CacheKey: ExportableCacheKey{CacheKey: cacheFoo}}}, 0, dgst("bar"), 0)
keys, err = m.Query([]CacheKeyWithSelector{{Selector: "sel-invalid", CacheKey: *cacheFoo}}, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: ExportableCacheKey{CacheKey: cacheFoo}}}, 0, dgst("bar"), 0)
keys, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: *cacheFoo}}, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
@ -157,44 +189,51 @@ func TestInMemoryCacheSelectorNested(t *testing.T) {
m := NewInMemoryCacheManager()
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("result0"))
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), testResult("result0"))
require.NoError(t, err)
k2 := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo, Selector: dgst("sel0")},
{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey(dgst("second"), 0, nil)}},
})
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: ExportableCacheKey{CacheKey: k2}},
_, err = m.Save(testCacheKeyWithDeps(dgst("bar"), 0, [][]CacheKeyWithSelector{
[]CacheKeyWithSelector{{CacheKey: *cacheFoo, Selector: dgst("sel0")}, {CacheKey: expKey(NewCacheKey(dgst("second"), 0))}},
}), testResult("result1"))
require.NoError(t, err)
matches, err := m.Query([]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: ExportableCacheKey{CacheKey: cacheFoo}}}, 0, dgst("bar"), 0)
keys, err := m.Query(
[]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: *cacheFoo}},
0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
res, err := m.Load(ctx, matches[0])
require.NoError(t, err)
require.Equal(t, "result1", unwrap(res))
matches, err = m.Query(depKeys(cacheFoo), 0, dgst("bar"), 0)
keys, err = m.Query(depKeys(*cacheFoo), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("bar"), CacheKey: ExportableCacheKey{CacheKey: cacheFoo}}}, 0, dgst("bar"), 0)
keys, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("bar"), CacheKey: *cacheFoo}}, 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
matches, err = m.Query(depKeys(NewCacheKey(dgst("second"), 0, nil)), 0, dgst("bar"), 0)
keys, err = m.Query(depKeys(expKey(NewCacheKey(dgst("second"), 0))), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
res, err = m.Load(ctx, matches[0])
require.NoError(t, err)
require.Equal(t, "result1", unwrap(res))
matches, err = m.Query(depKeys(NewCacheKey(dgst("second"), 0, nil)), 0, dgst("bar"), 0)
keys, err = m.Query(depKeys(expKey(NewCacheKey(dgst("second"), 0))), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, len(keys), 1)
}
func TestInMemoryCacheReleaseParent(t *testing.T) {
@ -203,44 +242,48 @@ func TestInMemoryCacheReleaseParent(t *testing.T) {
m := NewCacheManager(identity.NewID(), storage, results)
res0 := testResult("result0")
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0)
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), res0)
require.NoError(t, err)
res1 := testResult("result1")
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo},
}), res1)
_, err = m.Save(testCacheKey(dgst("bar"), 0, *cacheFoo), res1)
require.NoError(t, err)
matches, err := m.Query(nil, 0, dgst("foo"), 0)
keys, err := m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
err = storage.Release(res0.ID())
require.NoError(t, err)
// foo becomes unloadable
matches, err = m.Query(nil, 0, dgst("foo"), 0)
keys, err = m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 0)
keys, err = m.Query(depKeys(expKey(keys[0])), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.False(t, matches[0].Loadable)
matches, err = m.Query(depKeys(matches[0].CacheKey), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
// releasing bar releases both foo and bar
err = storage.Release(res1.ID())
require.NoError(t, err)
matches, err = m.Query(nil, 0, dgst("foo"), 0)
keys, err = m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 0)
require.Equal(t, len(keys), 0)
}
// TestInMemoryCacheRestoreOfflineDeletion deletes a result while the
@ -251,13 +294,11 @@ func TestInMemoryCacheRestoreOfflineDeletion(t *testing.T) {
m := NewCacheManager(identity.NewID(), storage, results)
res0 := testResult("result0")
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), res0)
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), res0)
require.NoError(t, err)
res1 := testResult("result1")
_, err = m.Save(NewCacheKey(dgst("bar"), 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo},
}), res1)
_, err = m.Save(testCacheKey(dgst("bar"), 0, *cacheFoo), res1)
require.NoError(t, err)
results2 := NewInMemoryResultStorage()
@ -266,16 +307,21 @@ func TestInMemoryCacheRestoreOfflineDeletion(t *testing.T) {
m = NewCacheManager(identity.NewID(), storage, results2)
matches, err := m.Query(nil, 0, dgst("foo"), 0)
keys, err := m.Query(nil, 0, dgst("foo"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err := m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 0)
keys, err = m.Query(depKeys(expKey(keys[0])), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(keys), 1)
matches, err = m.Records(keys[0])
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.False(t, matches[0].Loadable)
matches, err = m.Query(depKeys(matches[0].CacheKey), 0, dgst("bar"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.True(t, matches[0].Loadable)
}
func TestCarryOverFromSublink(t *testing.T) {
@ -283,32 +329,29 @@ func TestCarryOverFromSublink(t *testing.T) {
results := NewInMemoryResultStorage()
m := NewCacheManager(identity.NewID(), storage, results)
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0, nil), testResult("resultFoo"))
cacheFoo, err := m.Save(NewCacheKey(dgst("foo"), 0), testResult("resultFoo"))
require.NoError(t, err)
k := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheFoo, Selector: dgst("sel0")},
{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey(dgst("content0"), 0, nil)}, Selector: NoSelector},
})
_, err = m.Save(NewCacheKey(dgst("res"), 0, []CacheKeyWithSelector{{CacheKey: ExportableCacheKey{CacheKey: k}}}), testResult("result0"))
_, err = m.Save(testCacheKeyWithDeps(dgst("res"), 0, [][]CacheKeyWithSelector{
[]CacheKeyWithSelector{{CacheKey: *cacheFoo, Selector: dgst("sel0")}, {CacheKey: expKey(NewCacheKey(dgst("content0"), 0))}},
}), testResult("result0"))
require.NoError(t, err)
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0, nil), testResult("resultBar"))
cacheBar, err := m.Save(NewCacheKey(dgst("bar"), 0), testResult("resultBar"))
require.NoError(t, err)
k3 := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: cacheBar, Selector: dgst("sel0")},
{CacheKey: ExportableCacheKey{CacheKey: NewCacheKey(dgst("content0"), 0, nil)}, Selector: NoSelector},
})
matches, err := m.Query(depKeys(k3), 0, dgst("res"), 0)
keys, err := m.Query([]CacheKeyWithSelector{
{CacheKey: *cacheBar, Selector: dgst("sel0")},
{CacheKey: expKey(NewCacheKey(dgst("content0"), 0))},
}, 0, dgst("res"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, len(keys), 1)
matches, err = m.Query([]CacheKeyWithSelector{{Selector: dgst("sel0"), CacheKey: ExportableCacheKey{CacheKey: cacheBar}}}, 0, dgst("res"), 0)
keys, err = m.Query([]CacheKeyWithSelector{
{Selector: dgst("sel0"), CacheKey: *cacheBar},
}, 0, dgst("res"), 0)
require.NoError(t, err)
require.Equal(t, len(matches), 1)
require.Equal(t, len(keys), 1)
}
func dgst(s string) digest.Digest {

66
solver-next/cachekey.go Normal file
View File

@ -0,0 +1,66 @@
package solver
import (
"sync"
digest "github.com/opencontainers/go-digest"
)
// NewCacheKey creates a new cache key for a specific output index
func NewCacheKey(dgst digest.Digest, output Index) *CacheKey {
return &CacheKey{
ID: rootKey(dgst, output).String(),
digest: dgst,
output: output,
ids: map[*cacheManager]string{},
}
}
// CacheKeyWithSelector combines a cache key with an optional selector digest.
// Used to limit the matches for dependency cache key.
type CacheKeyWithSelector struct {
Selector digest.Digest
CacheKey ExportableCacheKey
}
type CacheKey struct {
mu sync.RWMutex
ID string
deps [][]CacheKeyWithSelector // only [][]*inMemoryCacheKey
digest digest.Digest
output Index
ids map[*cacheManager]string
indexIDs []string
}
func (ck *CacheKey) Deps() [][]CacheKeyWithSelector {
ck.mu.RLock()
defer ck.mu.RUnlock()
deps := make([][]CacheKeyWithSelector, len(ck.deps))
for i := range ck.deps {
deps[i] = append([]CacheKeyWithSelector(nil), ck.deps[i]...)
}
return deps
}
func (ck *CacheKey) Digest() digest.Digest {
return ck.digest
}
func (ck *CacheKey) Output() Index {
return ck.output
}
func (ck *CacheKey) clone() *CacheKey {
nk := &CacheKey{
ID: ck.ID,
digest: ck.digest,
output: ck.output,
ids: map[*cacheManager]string{},
}
for cm, id := range ck.ids {
nk.ids[cm] = id
}
return nk
}

270
solver-next/cachemanager.go Normal file
View File

@ -0,0 +1,270 @@
package solver
import (
"context"
"fmt"
"sync"
"github.com/moby/buildkit/identity"
digest "github.com/opencontainers/go-digest"
)
type CacheID string
func NewInMemoryCacheManager() CacheManager {
return NewCacheManager(identity.NewID(), NewInMemoryCacheStorage(), NewInMemoryResultStorage())
}
func NewCacheManager(id string, storage CacheKeyStorage, results CacheResultStorage) CacheManager {
cm := &cacheManager{
id: id,
backend: storage,
results: results,
}
storage.Walk(func(id string) error {
return storage.WalkResults(id, func(cr CacheResult) error {
if !results.Exists(cr.ID) {
storage.Release(cr.ID)
}
return nil
})
})
return cm
}
type cacheManager struct {
mu sync.RWMutex
id string
backend CacheKeyStorage
results CacheResultStorage
}
func (c *cacheManager) ID() string {
return c.id
}
func (c *cacheManager) Query(deps []CacheKeyWithSelector, input Index, dgst digest.Digest, output Index) ([]*CacheKey, error) {
c.mu.RLock()
defer c.mu.RUnlock()
type dep struct {
results map[string]struct{}
key CacheKeyWithSelector
}
allDeps := make([]dep, 0, len(deps))
for _, k := range deps {
allDeps = append(allDeps, dep{key: k, results: map[string]struct{}{}})
}
allRes := map[string]*CacheKey{}
for _, d := range allDeps {
if err := c.backend.WalkLinks(c.getID(d.key.CacheKey.CacheKey), CacheInfoLink{input, output, dgst, d.key.Selector}, func(id string) error {
d.results[id] = struct{}{}
if _, ok := allRes[id]; !ok {
allRes[id] = c.newKeyWithID(id, dgst, output)
}
return nil
}); err != nil {
return nil, err
}
}
// link the results against the keys that didn't exist
for id, key := range allRes {
for _, d := range allDeps {
if _, ok := d.results[id]; !ok {
if err := c.backend.AddLink(c.getID(d.key.CacheKey.CacheKey), CacheInfoLink{
Input: input,
Output: output,
Digest: dgst,
Selector: d.key.Selector,
}, c.getID(key)); err != nil {
return nil, err
}
}
}
}
if len(deps) == 0 {
if !c.backend.Exists(rootKey(dgst, output).String()) {
return nil, nil
}
return []*CacheKey{c.newRootKey(dgst, output)}, nil
}
keys := make([]*CacheKey, 0, len(deps))
for _, k := range allRes {
keys = append(keys, k)
}
return keys, nil
}
func (c *cacheManager) Records(ck *CacheKey) ([]*CacheRecord, error) {
outs := make([]*CacheRecord, 0)
if err := c.backend.WalkResults(c.getID(ck), func(r CacheResult) error {
if c.results.Exists(r.ID) {
outs = append(outs, &CacheRecord{
ID: r.ID,
cacheManager: c,
key: ck,
CreatedAt: r.CreatedAt,
})
} else {
c.backend.Release(r.ID)
}
return nil
}); err != nil {
return nil, err
}
return outs, nil
}
func (c *cacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
c.mu.RLock()
defer c.mu.RUnlock()
res, err := c.backend.Load(c.getID(rec.key), rec.ID)
if err != nil {
return nil, err
}
return c.results.Load(ctx, res)
}
func (c *cacheManager) Save(k *CacheKey, r Result) (*ExportableCacheKey, error) {
c.mu.Lock()
defer c.mu.Unlock()
res, err := c.results.Save(r)
if err != nil {
return nil, err
}
if err := c.backend.AddResult(c.getID(k), res); err != nil {
return nil, err
}
if err := c.ensurePersistentKey(k); err != nil {
return nil, err
}
rec := &CacheRecord{
ID: res.ID,
cacheManager: c,
key: k,
CreatedAt: res.CreatedAt,
}
return &ExportableCacheKey{
CacheKey: k,
Exporter: &exporter{k: k, record: rec},
}, nil
}
func newKey() *CacheKey {
return &CacheKey{ids: map[*cacheManager]string{}}
}
func (c *cacheManager) newKeyWithID(id string, dgst digest.Digest, output Index) *CacheKey {
k := newKey()
k.digest = dgst
k.output = output
k.ID = id
k.ids[c] = id
return k
}
func (c *cacheManager) newRootKey(dgst digest.Digest, output Index) *CacheKey {
return c.newKeyWithID(rootKey(dgst, output).String(), dgst, output)
}
func (c *cacheManager) getID(k *CacheKey) string {
k.mu.Lock()
id, ok := k.ids[c]
if ok {
k.mu.Unlock()
return id
}
if len(k.deps) == 0 {
k.ids[c] = k.ID
k.mu.Unlock()
return k.ID
}
id = c.getIDFromDeps(k)
k.ids[c] = id
k.mu.Unlock()
return id
}
func (c *cacheManager) ensurePersistentKey(k *CacheKey) error {
id := c.getID(k)
for i, deps := range k.Deps() {
for _, ck := range deps {
l := CacheInfoLink{
Input: Index(i),
Output: Index(k.Output()),
Digest: k.Digest(),
Selector: ck.Selector,
}
ckID := c.getID(ck.CacheKey.CacheKey)
if !c.backend.HasLink(ckID, l, id) {
if err := c.ensurePersistentKey(ck.CacheKey.CacheKey); err != nil {
return err
}
if err := c.backend.AddLink(ckID, l, id); err != nil {
return err
}
}
}
}
return nil
}
func (c *cacheManager) getIDFromDeps(k *CacheKey) string {
matches := map[string]struct{}{}
for i, deps := range k.deps {
if i == 0 || len(matches) > 0 {
for _, ck := range deps {
m2 := make(map[string]struct{})
if err := c.backend.WalkLinks(c.getID(ck.CacheKey.CacheKey), CacheInfoLink{
Input: Index(i),
Output: Index(k.Output()),
Digest: k.Digest(),
Selector: ck.Selector,
}, func(id string) error {
if i == 0 {
matches[id] = struct{}{}
} else {
m2[id] = struct{}{}
}
return nil
}); err != nil {
matches = map[string]struct{}{}
break
}
if i != 0 {
for id := range matches {
if _, ok := m2[id]; !ok {
delete(matches, id)
}
}
}
}
}
}
for k := range matches {
return k
}
return identity.NewID()
}
func rootKey(dgst digest.Digest, output Index) digest.Digest {
return digest.FromBytes([]byte(fmt.Sprintf("%s@%d", dgst, output)))
}

View File

@ -22,11 +22,12 @@ type CacheKeyStorage interface {
AddLink(id string, link CacheInfoLink, target string) error
WalkLinks(id string, link CacheInfoLink, fn func(id string) error) error
HasLink(id string, link CacheInfoLink, target string) bool
WalkBacklinks(id string, fn func(id string, link CacheInfoLink) error) error
}
// CacheResult is a record for a single solve result
type CacheResult struct {
// Payload []byte
CreatedAt time.Time
ID string
}

View File

@ -18,6 +18,7 @@ func RunCacheStorageTests(t *testing.T, st func() (CacheKeyStorage, func())) {
testLinks,
testResultReleaseSingleLevel,
testResultReleaseMultiLevel,
testBacklinks,
} {
runStorageTest(t, tc, st)
}
@ -201,6 +202,38 @@ func testResultReleaseSingleLevel(t *testing.T, st CacheKeyStorage) {
require.Equal(t, len(m), 0)
}
func testBacklinks(t *testing.T, st CacheKeyStorage) {
t.Parallel()
err := st.AddResult("foo", CacheResult{
ID: "foo-result",
CreatedAt: time.Now(),
})
require.NoError(t, err)
err = st.AddResult("sub0", CacheResult{
ID: "sub0-result",
CreatedAt: time.Now(),
})
require.NoError(t, err)
l0 := CacheInfoLink{
Input: 0, Output: 1, Digest: digest.FromBytes([]byte("to-sub0")),
}
err = st.AddLink("foo", l0, "sub0")
require.NoError(t, err)
backlinks := 0
st.WalkBacklinks("sub0", func(id string, link CacheInfoLink) error {
require.Equal(t, id, "foo")
require.Equal(t, link.Input, Index(0))
require.Equal(t, link.Digest, rootKey(digest.FromBytes([]byte("to-sub0")), 1))
backlinks++
return nil
})
require.Equal(t, backlinks, 1)
}
func testResultReleaseMultiLevel(t *testing.T, st CacheKeyStorage) {
t.Parallel()

View File

@ -0,0 +1,124 @@
package solver
import (
"context"
"strings"
"sync"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
func newCombinedCacheManager(cms []CacheManager, main CacheManager) CacheManager {
return &combinedCacheManager{cms: cms, main: main}
}
type combinedCacheManager struct {
cms []CacheManager
main CacheManager
id string
idOnce sync.Once
}
func (cm *combinedCacheManager) ID() string {
cm.idOnce.Do(func() {
ids := make([]string, len(cm.cms))
for i, c := range cm.cms {
ids[i] = c.ID()
}
cm.id = digest.FromBytes([]byte(strings.Join(ids, ","))).String()
})
return cm.id
}
func (cm *combinedCacheManager) Query(inp []CacheKeyWithSelector, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheKey, error) {
eg, _ := errgroup.WithContext(context.TODO())
keys := make(map[string]*CacheKey, len(cm.cms))
var mu sync.Mutex
for _, c := range cm.cms {
func(c CacheManager) {
eg.Go(func() error {
recs, err := c.Query(inp, inputIndex, dgst, outputIndex)
if err != nil {
return err
}
mu.Lock()
for _, r := range recs {
if _, ok := keys[r.ID]; !ok || c == cm.main {
keys[r.ID] = r
}
}
mu.Unlock()
return nil
})
}(c)
}
if err := eg.Wait(); err != nil {
return nil, err
}
out := make([]*CacheKey, 0, len(keys))
for _, k := range keys {
out = append(out, k)
}
return out, nil
}
func (cm *combinedCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
res, err := rec.cacheManager.Load(ctx, rec)
if err != nil {
return nil, err
}
if _, err := cm.main.Save(rec.key, res); err != nil {
return nil, err
}
return res, nil
}
func (cm *combinedCacheManager) Save(key *CacheKey, s Result) (*ExportableCacheKey, error) {
return cm.main.Save(key, s)
}
func (cm *combinedCacheManager) Records(ck *CacheKey) ([]*CacheRecord, error) {
if len(ck.ids) == 0 {
return nil, errors.Errorf("no results")
}
records := map[string]*CacheRecord{}
var mu sync.Mutex
eg, _ := errgroup.WithContext(context.TODO())
for c := range ck.ids {
func(c *cacheManager) {
eg.Go(func() error {
recs, err := c.Records(ck)
if err != nil {
return err
}
mu.Lock()
for _, rec := range recs {
if _, ok := records[rec.ID]; !ok || c == cm.main {
if c == cm.main {
rec.Priority = 1
}
records[rec.ID] = rec
}
}
mu.Unlock()
return nil
})
}(c)
}
if err := eg.Wait(); err != nil {
return nil, err
}
out := make([]*CacheRecord, 0, len(records))
for _, rec := range records {
out = append(out, rec)
}
return out, nil
}

View File

@ -28,6 +28,7 @@ func newEdge(ed Edge, op activeOp, index *EdgeIndex) *edge {
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]*CacheKey{},
cacheRecords: map[string]*CacheRecord{},
index: index,
}
@ -46,6 +47,7 @@ type edge struct {
execReq pipe.Receiver
err error
cacheRecords map[string]*CacheRecord
keyMap map[string]*CacheKey
noCacheMatchPossible bool
allDepsCompletedCacheFast bool
@ -57,6 +59,8 @@ type edge struct {
releaserCount int
keysDidChange bool
index *EdgeIndex
secondaryExporters []expDep
}
// dep holds state for a dependant edge
@ -64,7 +68,7 @@ type dep struct {
req pipe.Receiver
edgeState
index Index
cacheRecords map[string]*CacheRecord
keyMap map[string]*CacheKey
desiredState edgeStatusType
e *edge
slowCacheReq pipe.Receiver // TODO: reuse req
@ -74,8 +78,13 @@ type dep struct {
err error
}
type expDep struct {
index int
cacheKey CacheKeyWithSelector
}
func newDep(i Index) *dep {
return &dep{index: i, cacheRecords: map[string]*CacheRecord{}}
return &dep{index: i, keyMap: map[string]*CacheKey{}}
}
type edgePipe struct {
@ -123,21 +132,24 @@ func (e *edge) release() {
}
// commitOptions returns parameters for the op execution
func (e *edge) commitOptions() (CacheKey, []CachedResult) {
func (e *edge) commitOptions() (*CacheKey, []CachedResult) {
k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
if e.deps == nil {
return NewCacheKey(e.cacheMap.Digest, e.edge.Index, nil), nil
return k, nil
}
inputs := make([]CacheKeyWithSelector, len(e.deps))
inputs := make([][]CacheKeyWithSelector, len(e.deps))
results := make([]CachedResult, len(e.deps))
for i, dep := range e.deps {
inputs[i] = CacheKeyWithSelector{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector}
inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector})
if dep.slowCacheKey != nil {
inputs[i] = CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
}
results[i] = dep.result
}
return NewCacheKey(e.cacheMap.Digest, e.edge.Index, inputs), results
k.deps = inputs
return k, results
}
// isComplete returns true if edge state is final and will never change
@ -164,23 +176,25 @@ func (e *edge) updateIncoming(req pipe.Sender) {
// probeCache is called with unprocessed cache keys for dependency
// if the key could match the edge, the cacheRecords for dependency are filled
func (e *edge) probeCache(d *dep, keys []CacheKeyWithSelector) bool {
if len(keys) == 0 {
func (e *edge) probeCache(d *dep, depKeys []CacheKeyWithSelector) bool {
if len(depKeys) == 0 {
return false
}
if e.op.IgnoreCache() {
return false
}
records, err := e.op.Cache().Query(keys, d.index, e.cacheMap.Digest, e.edge.Index)
keys, err := e.op.Cache().Query(depKeys, d.index, e.cacheMap.Digest, e.edge.Index)
if err != nil {
e.err = errors.Wrap(err, "error on cache query")
}
for _, r := range records {
if _, ok := d.cacheRecords[r.ID]; !ok {
d.cacheRecords[r.ID] = r
found := false
for _, k := range keys {
if _, ok := d.keyMap[k.ID]; !ok {
d.keyMap[k.ID] = k
found = true
}
}
return len(records) > 0
return found
}
// checkDepMatchPossible checks if any cache matches are possible past this point
@ -206,7 +220,7 @@ func (e *edge) allDepsHaveKeys() bool {
return false
}
for _, d := range e.deps {
if len(d.keys) == 0 && d.slowCacheKey == nil {
if len(d.keys) == 0 && d.slowCacheKey == nil && d.result == nil {
return false
}
}
@ -214,20 +228,31 @@ func (e *edge) allDepsHaveKeys() bool {
}
// depKeys returns all current dependency cache keys
func (e *edge) depKeys() [][]CacheKey {
keys := make([][]CacheKey, len(e.deps))
func (e *edge) currentIndexKey() *CacheKey {
if e.cacheMap == nil {
return nil
}
keys := make([][]CacheKeyWithSelector, len(e.deps))
for i, d := range e.deps {
if len(d.keys) == 0 && d.result == nil {
return nil
}
for _, k := range d.keys {
keys[i] = append(keys[i], k.CacheKey)
keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
}
if d.result != nil {
keys[i] = append(keys[i], d.result.CacheKey())
}
if d.slowCacheKey != nil {
keys[i] = append(keys[i], d.slowCacheKey)
keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: d.result.CacheKey()})
if d.slowCacheKey != nil {
keys[i] = append(keys[i], CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: d.slowCacheKey.CacheKey, Exporter: &exporter{k: d.slowCacheKey.CacheKey}}})
}
}
}
return keys
k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
k.deps = keys
return k
}
// slow cache keys can be computed in 2 phases if there are multiple deps.
@ -235,13 +260,13 @@ func (e *edge) depKeys() [][]CacheKey {
func (e *edge) skipPhase2SlowCache(dep *dep) bool {
isPhase1 := false
for _, dep := range e.deps {
if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.cacheRecords) == 0 {
if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) == 0 {
isPhase1 = true
break
}
}
if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.cacheRecords) > 0 {
if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) > 0 {
return true
}
return false
@ -250,13 +275,13 @@ func (e *edge) skipPhase2SlowCache(dep *dep) bool {
func (e *edge) skipPhase2FastCache(dep *dep) bool {
isPhase1 := false
for _, dep := range e.deps {
if e.cacheMap == nil || len(dep.cacheRecords) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
if e.cacheMap == nil || len(dep.keyMap) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
isPhase1 = true
break
}
}
if isPhase1 && len(dep.cacheRecords) > 0 {
if isPhase1 && len(dep.keyMap) > 0 {
return true
}
return false
@ -317,6 +342,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW
return out
}
func (e *edge) makeExportable(k *CacheKey, records []*CacheRecord) ExportableCacheKey {
return ExportableCacheKey{
CacheKey: k,
Exporter: &exporter{k: k, records: records},
}
}
// processUpdate is called by unpark for every updated pipe request
func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
// response for cachemap request
@ -330,25 +362,30 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
e.cacheMap = upt.Status().Value.(*CacheMap)
if len(e.deps) == 0 {
if !e.op.IgnoreCache() {
records, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
if err != nil {
logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
} else {
for _, r := range records {
if r.Loadable {
for _, k := range keys {
records, err := e.op.Cache().Records(k)
if err != nil {
logrus.Errorf("error receiving cache records: %v", err)
continue
}
for _, r := range records {
e.cacheRecords[r.ID] = r
}
}
if len(records) > 0 {
e.keys = append(e.keys, records[0].CacheKey)
e.keys = append(e.keys, e.makeExportable(k, records))
}
}
}
if e.allDepsHaveKeys() {
e.keysDidChange = true
}
e.state = edgeStatusCacheSlow
}
if e.allDepsHaveKeys() {
e.keysDidChange = true
}
// probe keys that were loaded before cache map
for i, dep := range e.deps {
e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector))
@ -418,13 +455,11 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
e.err = upt.Status().Err
}
} else if !dep.slowCacheComplete {
k := NewCacheKey(upt.Status().Value.(digest.Digest), -1, nil)
ck := NewCacheKey("", 0, []CacheKeyWithSelector{
{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector},
{CacheKey: ExportableCacheKey{CacheKey: k, Exporter: &emptyExporter{k}}, Selector: NoSelector},
})
dep.slowCacheKey = &ExportableCacheKey{ck, &emptyExporter{ck}}
dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{{CacheKey: ExportableCacheKey{CacheKey: *dep.slowCacheKey}}})
k := NewCacheKey(upt.Status().Value.(digest.Digest), -1)
dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
defKeyExp := CacheKeyWithSelector{CacheKey: dep.result.CacheKey(), Selector: e.cacheMap.Deps[i].Selector}
dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{defKeyExp, slowKeyExp})
dep.slowCacheComplete = true
e.keysDidChange = true
e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
@ -440,39 +475,56 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
// the state of dependencies has changed
func (e *edge) recalcCurrentState() {
// TODO: fast pass to detect incomplete results
newRecords := map[string]*CacheRecord{}
newKeys := map[string]*CacheKey{}
for i, dep := range e.deps {
if i == 0 {
for key, r := range dep.cacheRecords {
if _, ok := e.cacheRecords[key]; ok {
for id, k := range dep.keyMap {
if _, ok := e.keyMap[id]; ok {
continue
}
newRecords[key] = r
newKeys[id] = k
}
} else {
for key := range newRecords {
if _, ok := dep.cacheRecords[key]; !ok {
delete(newRecords, key)
for id := range newKeys {
if _, ok := dep.keyMap[id]; !ok {
delete(newKeys, id)
}
}
}
if len(newRecords) == 0 {
if len(newKeys) == 0 {
break
}
}
for k, r := range newRecords {
mergedKey := r.CacheKey
if len(e.deps) > 0 {
mergedKey = toMergedCacheKey(e.deps, k)
for _, r := range newKeys {
// TODO: add all deps automatically
mergedKey := r.clone()
mergedKey.deps = make([][]CacheKeyWithSelector, len(e.deps))
for i, dep := range e.deps {
if dep.result != nil {
mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: dep.result.CacheKey()})
if dep.slowCacheKey != nil {
mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
}
} else {
for _, k := range dep.keys {
mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
}
}
}
e.keys = append(e.keys, mergedKey)
if r.Loadable {
r2 := r
r2.CacheKey = mergedKey
e.cacheRecords[k] = r2
records, err := e.op.Cache().Records(mergedKey)
if err != nil {
logrus.Errorf("error receiving cache records: %v", err)
continue
}
for _, r := range records {
e.cacheRecords[r.ID] = r
}
e.keys = append(e.keys, e.makeExportable(mergedKey, records))
}
// detect lower/upper bound for current state
@ -486,7 +538,7 @@ func (e *edge) recalcCurrentState() {
for _, dep := range e.deps {
isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
if dep.state > stLow && len(dep.cacheRecords) == 0 && !isSlowIncomplete {
if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete {
stLow = dep.state
if stLow > edgeStatusCacheSlow {
stLow = edgeStatusCacheSlow
@ -714,28 +766,26 @@ func (e *edge) postpone(f *pipeFactory) {
// loadCache creates a request to load edge result from cache
func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
var rec *CacheRecord
recs := make([]*CacheRecord, 0, len(e.cacheRecords))
for _, r := range e.cacheRecords {
if !r.Loadable {
continue
}
if rec == nil || rec.CreatedAt.Before(r.CreatedAt) || (rec.CreatedAt.Equal(r.CreatedAt) && rec.Priority < r.Priority) {
rec = r
}
recs = append(recs, r)
}
rec := getBestResult(recs)
logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
res, err := e.op.Cache().Load(ctx, rec)
res, err := e.op.LoadCache(ctx, rec)
if err != nil {
return nil, err
}
return NewCachedResult(res, rec.CacheKey, rec.CacheKey), nil
return NewCachedResult(res, ExportableCacheKey{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}), nil
}
// execOp creates a request to execute the vertex operation
func (e *edge) execOp(ctx context.Context) (interface{}, error) {
cacheKey, inputs := e.commitOptions()
results, err := e.op.Exec(ctx, toResultSlice(inputs))
results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs))
if err != nil {
return nil, err
}
@ -756,7 +806,20 @@ func (e *edge) execOp(ctx context.Context) (interface{}, error) {
if err != nil {
return nil, err
}
return NewCachedResult(res, ck, ck), nil
exps := make([]Exporter, 0, len(subExporters))
for _, exp := range subExporters {
exps = append(exps, exp.Exporter)
}
if len(subExporters) > 0 {
ck = &ExportableCacheKey{
CacheKey: ck.CacheKey,
Exporter: &mergedExporter{exporters: append([]Exporter{ck.Exporter}, exps...)},
}
}
return NewCachedResult(res, *ck), nil
}
func toResultSlice(cres []CachedResult) (out []Result) {
@ -766,79 +829,3 @@ func toResultSlice(cres []CachedResult) (out []Result) {
}
return out
}
type emptyExporter struct {
CacheKey
}
func (e *emptyExporter) Export(ctx context.Context, m map[digest.Digest]*ExportRecord, fn func(context.Context, Result) (*Remote, error)) (*ExportRecord, error) {
id := getUniqueID(e.CacheKey)
rec, ok := m[id]
if !ok {
rec = &ExportRecord{Digest: id, Links: map[CacheLink]struct{}{}}
m[id] = rec
}
deps := e.CacheKey.Deps()
for i, dep := range deps {
r, err := dep.CacheKey.Export(ctx, m, fn)
if err != nil {
return nil, err
}
link := CacheLink{
Source: r.Digest,
Input: Index(i),
Output: e.Output(),
Base: e.Digest(),
Selector: dep.Selector,
}
rec.Links[link] = struct{}{}
}
if len(deps) == 0 {
m[id].Links[CacheLink{
Output: e.Output(),
Base: e.Digest(),
}] = struct{}{}
}
return rec, nil
}
type mergedExporter struct {
exporters []Exporter
}
func (e *mergedExporter) Export(ctx context.Context, m map[digest.Digest]*ExportRecord, fn func(context.Context, Result) (*Remote, error)) (er *ExportRecord, err error) {
for _, e := range e.exporters {
er, err = e.Export(ctx, m, fn)
if err != nil {
return nil, err
}
}
return
}
func toMergedCacheKey(deps []*dep, id string) ExportableCacheKey {
depKeys := make([]CacheKeyWithSelector, len(deps))
exporters := make([]Exporter, len(deps))
for i, d := range deps {
depKeys[i] = d.cacheRecords[id].CacheKey.Deps()[i]
exporters[i] = d.cacheRecords[id].CacheKey
}
return ExportableCacheKey{
CacheKey: &mergedCacheKey{CacheKey: deps[0].cacheRecords[id].CacheKey, deps: depKeys},
Exporter: &mergedExporter{exporters: exporters},
}
}
type mergedCacheKey struct {
CacheKey
deps []CacheKeyWithSelector
}
func (ck *mergedCacheKey) Deps() []CacheKeyWithSelector {
return ck.deps
}

179
solver-next/exporter.go Normal file
View File

@ -0,0 +1,179 @@
package solver
import (
"context"
digest "github.com/opencontainers/go-digest"
)
type exporter struct {
k *CacheKey
records []*CacheRecord
record *CacheRecord
res []ExporterRecord
edge *edge // for secondaryExporters
}
func addBacklinks(t ExporterTarget, rec ExporterRecord, cm *cacheManager, id string, bkm map[string]ExporterRecord) (ExporterRecord, error) {
if rec == nil {
var ok bool
rec, ok = bkm[id]
if ok {
return rec, nil
}
_ = ok
}
if err := cm.backend.WalkBacklinks(id, func(id string, link CacheInfoLink) error {
if rec == nil {
rec = t.Add(link.Digest)
}
r, ok := bkm[id]
if !ok {
var err error
r, err = addBacklinks(t, nil, cm, id, bkm)
if err != nil {
return err
}
}
rec.LinkFrom(r, int(link.Input), link.Selector.String())
return nil
}); err != nil {
return nil, err
}
if rec == nil {
rec = t.Add(digest.Digest(id))
}
bkm[id] = rec
return rec, nil
}
type backlinkT struct{}
var backlinkKey = backlinkT{}
func (e *exporter) ExportTo(ctx context.Context, t ExporterTarget, converter func(context.Context, Result) (*Remote, error)) ([]ExporterRecord, error) {
var bkm map[string]ExporterRecord
if bk := ctx.Value(backlinkKey); bk == nil {
bkm = map[string]ExporterRecord{}
ctx = context.WithValue(ctx, backlinkKey, bkm)
} else {
bkm = bk.(map[string]ExporterRecord)
}
if t.Visited(e) {
return e.res, nil
}
deps := e.k.Deps()
type expr struct {
r ExporterRecord
selector digest.Digest
}
srcs := make([][]expr, len(deps))
for i, deps := range deps {
for _, dep := range deps {
recs, err := dep.CacheKey.ExportTo(ctx, t, converter)
if err != nil {
return nil, nil
}
for _, r := range recs {
srcs[i] = append(srcs[i], expr{r: r, selector: dep.Selector})
}
}
}
if e.edge != nil {
for _, de := range e.edge.secondaryExporters {
recs, err := de.cacheKey.CacheKey.ExportTo(ctx, t, converter)
if err != nil {
return nil, nil
}
for _, r := range recs {
srcs[de.index] = append(srcs[de.index], expr{r: r, selector: de.cacheKey.Selector})
}
}
}
rec := t.Add(rootKey(e.k.Digest(), e.k.Output()))
for i, srcs := range srcs {
for _, src := range srcs {
rec.LinkFrom(src.r, i, src.selector.String())
}
}
for cm, id := range e.k.ids {
if _, err := addBacklinks(t, rec, cm, id, bkm); err != nil {
return nil, err
}
}
if e.record == nil && len(e.k.Deps()) > 0 {
e.record = getBestResult(e.records)
}
var remote *Remote
if v := e.record; v != nil && len(e.k.Deps()) > 0 {
cm := v.cacheManager
res, err := cm.backend.Load(cm.getID(v.key), v.ID)
if err != nil {
return nil, err
}
remote, err = cm.results.LoadRemote(ctx, res)
if err != nil {
return nil, err
}
if remote == nil {
res, err := cm.results.Load(ctx, res)
if err != nil {
return nil, err
}
remote, err = converter(ctx, res)
if err != nil {
return nil, err
}
res.Release(context.TODO())
}
if remote != nil {
rec.AddResult(v.CreatedAt, remote)
}
}
e.res = []ExporterRecord{rec}
t.Visit(e)
return e.res, nil
}
func getBestResult(records []*CacheRecord) *CacheRecord {
var rec *CacheRecord
for _, r := range records {
if rec == nil || rec.CreatedAt.Before(r.CreatedAt) || (rec.CreatedAt.Equal(r.CreatedAt) && rec.Priority < r.Priority) {
rec = r
}
}
return rec
}
type mergedExporter struct {
exporters []Exporter
}
func (e *mergedExporter) ExportTo(ctx context.Context, t ExporterTarget, converter func(context.Context, Result) (*Remote, error)) (er []ExporterRecord, err error) {
for _, e := range e.exporters {
r, err := e.ExportTo(ctx, t, converter)
if err != nil {
return nil, err
}
er = append(er, r...)
}
return
}

View File

@ -32,12 +32,12 @@ func (s *inMemoryStore) Exists(id string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
if k, ok := s.byID[id]; ok {
return len(k.links) > 0
return len(k.links) > 0 || len(k.results) > 0
}
return false
}
func newKey(id string) *inMemoryKey {
func newInMemoryKey(id string) *inMemoryKey {
return &inMemoryKey{
results: map[string]CacheResult{},
links: map[CacheInfoLink]map[string]struct{}{},
@ -103,7 +103,7 @@ func (s *inMemoryStore) AddResult(id string, res CacheResult) error {
defer s.mu.Unlock()
k, ok := s.byID[id]
if !ok {
k = newKey(id)
k = newInMemoryKey(id)
s.byID[id] = k
}
k.results[res.ID] = res
@ -169,12 +169,12 @@ func (s *inMemoryStore) AddLink(id string, link CacheInfoLink, target string) er
defer s.mu.Unlock()
k, ok := s.byID[id]
if !ok {
k = newKey(id)
k = newInMemoryKey(id)
s.byID[id] = k
}
k2, ok := s.byID[target]
if !ok {
k2 = newKey(target)
k2 = newInMemoryKey(target)
s.byID[target] = k2
}
m, ok := k.links[link]
@ -209,6 +209,55 @@ func (s *inMemoryStore) WalkLinks(id string, link CacheInfoLink, fn func(id stri
return nil
}
func (s *inMemoryStore) HasLink(id string, link CacheInfoLink, target string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
if k, ok := s.byID[id]; ok {
if v, ok := k.links[link]; ok {
if _, ok := v[target]; ok {
return true
}
}
}
return false
}
func (s *inMemoryStore) WalkBacklinks(id string, fn func(id string, link CacheInfoLink) error) error {
s.mu.RLock()
k, ok := s.byID[id]
if !ok {
s.mu.RUnlock()
return nil
}
var outIDs []string
var outLinks []CacheInfoLink
for bid := range k.backlinks {
b, ok := s.byID[bid]
if !ok {
continue
}
for l, m := range b.links {
if _, ok := m[id]; !ok {
continue
}
outIDs = append(outIDs, bid)
outLinks = append(outLinks, CacheInfoLink{
Digest: rootKey(l.Digest, l.Output),
Input: l.Input,
Selector: l.Selector,
})
}
}
s.mu.RUnlock()
for i := range outIDs {
if err := fn(outIDs[i], outLinks[i]); err != nil {
return err
}
}
return nil
}
func NewInMemoryResultStorage() CacheResultStorage {
return &inMemoryResultStore{m: &sync.Map{}}
}

View File

@ -5,7 +5,6 @@ import (
"sync"
"sync/atomic"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -59,30 +58,17 @@ func (r *splitResult) Release(ctx context.Context) error {
}
// NewCachedResult combines a result and cache key into cached result
func NewCachedResult(res Result, k CacheKey, exp Exporter) CachedResult {
return &cachedResult{res, k, exp}
func NewCachedResult(res Result, k ExportableCacheKey) CachedResult {
return &cachedResult{res, k}
}
type cachedResult struct {
Result
k CacheKey
exp Exporter
k ExportableCacheKey
}
func (cr *cachedResult) CacheKey() ExportableCacheKey {
return ExportableCacheKey{CacheKey: cr.k, Exporter: cr.exp}
}
func (cr *cachedResult) Export(ctx context.Context, converter func(context.Context, Result) (*Remote, error)) ([]ExportRecord, error) {
m := make(map[digest.Digest]*ExportRecord)
if _, err := cr.exp.Export(ctx, m, converter); err != nil {
return nil, err
}
out := make([]ExportRecord, 0, len(m))
for _, r := range m {
out = append(out, *r)
}
return out, nil
return cr.k
}
func NewSharedCachedResult(res CachedResult) *SharedCachedResult {
@ -113,10 +99,6 @@ func (cr *clonedCachedResult) CacheKey() ExportableCacheKey {
return cr.cr.CacheKey()
}
func (cr *clonedCachedResult) Export(ctx context.Context, converter func(context.Context, Result) (*Remote, error)) ([]ExportRecord, error) {
return cr.cr.Export(ctx, converter)
}
type SharedCachedResult struct {
*SharedResult
CachedResult

View File

@ -2,7 +2,6 @@ package solver
import (
"context"
"sync"
"time"
"github.com/containerd/containerd/content"
@ -51,23 +50,29 @@ type Result interface {
type CachedResult interface {
Result
CacheKey() ExportableCacheKey
Export(ctx context.Context, converter func(context.Context, Result) (*Remote, error)) ([]ExportRecord, error)
}
// Exporter can export the artifacts of the build chain
type Exporter interface {
Export(ctx context.Context, m map[digest.Digest]*ExportRecord, converter func(context.Context, Result) (*Remote, error)) (*ExportRecord, error)
ExportTo(ctx context.Context, t ExporterTarget, converter func(context.Context, Result) (*Remote, error)) ([]ExporterRecord, error)
}
// ExportRecord defines a single record in the exported cache chain
type ExportRecord struct {
Digest digest.Digest
Links map[CacheLink]struct{}
Remote *Remote
// ExporterTarget defines object capable of receiving exports
type ExporterTarget interface {
Add(dgst digest.Digest) ExporterRecord
Visit(interface{})
Visited(interface{}) bool
}
// ExporterRecord is a single object being exported
type ExporterRecord interface {
AddResult(createdAt time.Time, result *Remote)
LinkFrom(src ExporterRecord, index int, selector string)
}
// Remote is a descriptor or a list of stacked descriptors that can be pulled
// from a content provider
// TODO: add closer to keep referenced data from getting deleted
type Remote struct {
Descriptors []ocispec.Descriptor
Provider content.Provider
@ -75,11 +80,11 @@ type Remote struct {
// CacheLink is a link between two cache records
type CacheLink struct {
Source digest.Digest
Input Index
Output Index
Base digest.Digest
Selector digest.Digest
Source digest.Digest `json:",omitempty"`
Input Index `json:",omitempty"`
Output Index `json:",omitempty"`
Base digest.Digest `json:",omitempty"`
Selector digest.Digest `json:",omitempty"`
}
// Op is an implementation for running a vertex
@ -108,32 +113,19 @@ type CacheMap struct {
// ExportableCacheKey is a cache key connected with an exporter that can export
// a chain of cacherecords pointing to that key
type ExportableCacheKey struct {
CacheKey
*CacheKey
Exporter
}
// CacheKey is an identifier for storing/loading build cache
type CacheKey interface {
// Deps are dependant cache keys
Deps() []CacheKeyWithSelector
// Base digest for operation. Usually CacheMap.Digest
Digest() digest.Digest
// Index for the output that is cached
Output() Index
// Helpers for implementations for adding internal metadata
SetValue(key, value interface{})
GetValue(key interface{}) interface{}
}
// CacheRecord is an identifier for loading in cache
type CacheRecord struct {
ID string
CacheKey ExportableCacheKey
CacheManager CacheManager
Loadable bool
// Size int
ID string
Size int
CreatedAt time.Time
Priority int
cacheManager *cacheManager
key *CacheKey
}
// CacheManager implements build cache backend
@ -143,58 +135,10 @@ type CacheManager interface {
ID() string
// Query searches for cache paths from one cache key to the output of a
// possible match.
Query(inp []CacheKeyWithSelector, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheRecord, error)
Query(inp []CacheKeyWithSelector, inputIndex Index, dgst digest.Digest, outputIndex Index) ([]*CacheKey, error)
Records(ck *CacheKey) ([]*CacheRecord, error)
// Load pulls and returns the cached result
Load(ctx context.Context, rec *CacheRecord) (Result, error)
// Save saves a result based on a cache key
Save(key CacheKey, s Result) (ExportableCacheKey, error)
}
// NewCacheKey creates a new cache key for a specific output index
func NewCacheKey(dgst digest.Digest, index Index, deps []CacheKeyWithSelector) CacheKey {
return &cacheKey{
dgst: dgst,
deps: deps,
index: index,
values: &sync.Map{},
}
}
// CacheKeyWithSelector combines a cache key with an optional selector digest.
// Used to limit the matches for dependency cache key.
type CacheKeyWithSelector struct {
Selector digest.Digest
CacheKey ExportableCacheKey
}
type cacheKey struct {
dgst digest.Digest
index Index
deps []CacheKeyWithSelector
values *sync.Map
}
func (ck *cacheKey) SetValue(key, value interface{}) {
ck.values.Store(key, value)
}
func (ck *cacheKey) GetValue(key interface{}) interface{} {
v, _ := ck.values.Load(key)
return v
}
func (ck *cacheKey) Deps() []CacheKeyWithSelector {
return ck.deps
}
func (ck *cacheKey) Digest() digest.Digest {
return ck.dgst
}
func (ck *cacheKey) Output() Index {
return ck.index
}
func (ck *cacheKey) Export(ctx context.Context, converter func(context.Context, Result) ([]ocispec.Descriptor, content.Provider, error)) ([]ExportRecord, content.Provider, error) {
return nil, nil, nil
Save(key *CacheKey, s Result) (*ExportableCacheKey, error)
}