load parent results on combined cachemanager
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-19.03
parent
d77d950053
commit
5be7870549
|
@ -27,6 +27,7 @@ func NewCacheKeyStorage(cc *CacheChains, w worker.Worker) (solver.CacheKeyStorag
|
|||
results := &cacheResultStorage{
|
||||
w: w,
|
||||
byID: storage.byID,
|
||||
byItem: storage.byItem,
|
||||
byResult: storage.byResult,
|
||||
}
|
||||
|
||||
|
@ -204,19 +205,54 @@ type cacheResultStorage struct {
|
|||
w worker.Worker
|
||||
byID map[string]*itemWithOutgoingLinks
|
||||
byResult map[string]map[string]struct{}
|
||||
byItem map[*item]string
|
||||
}
|
||||
|
||||
func (cs *cacheResultStorage) Save(res solver.Result, createdAt time.Time) (solver.CacheResult, error) {
|
||||
return solver.CacheResult{}, errors.Errorf("importer is immutable")
|
||||
}
|
||||
|
||||
func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult) (solver.Result, error) {
|
||||
remote, err := cs.LoadRemote(ctx, res)
|
||||
func (cs *cacheResultStorage) LoadWithParents(ctx context.Context, res solver.CacheResult) (map[string]solver.Result, error) {
|
||||
v := cs.byResultID(res.ID)
|
||||
if v == nil || v.result == nil {
|
||||
return nil, errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
|
||||
m := map[string]solver.Result{}
|
||||
|
||||
if err := v.walkAllResults(func(i *item) error {
|
||||
if i.result == nil {
|
||||
return nil
|
||||
}
|
||||
id, ok := cs.byItem[i]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if isSubRemote(*i.result, *v.result) {
|
||||
ref, err := cs.w.FromRemote(ctx, i.result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m[id] = worker.NewWorkerRefResult(ref, cs.w)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
for _, v := range m {
|
||||
v.Release(context.TODO())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ref, err := cs.w.FromRemote(ctx, remote)
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult) (solver.Result, error) {
|
||||
item := cs.byResultID(res.ID)
|
||||
if item == nil || item.result == nil {
|
||||
return nil, errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
|
||||
ref, err := cs.w.FromRemote(ctx, item.result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -224,8 +260,8 @@ func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult)
|
|||
}
|
||||
|
||||
func (cs *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult) (*solver.Remote, error) {
|
||||
if r := cs.byResultID(res.ID); r != nil {
|
||||
return r, nil
|
||||
if r := cs.byResultID(res.ID); r != nil && r.result != nil {
|
||||
return r.result, nil
|
||||
}
|
||||
return nil, errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
|
@ -234,7 +270,7 @@ func (cs *cacheResultStorage) Exists(id string) bool {
|
|||
return cs.byResultID(id) != nil
|
||||
}
|
||||
|
||||
func (cs *cacheResultStorage) byResultID(resultID string) *solver.Remote {
|
||||
func (cs *cacheResultStorage) byResultID(resultID string) *itemWithOutgoingLinks {
|
||||
m, ok := cs.byResult[resultID]
|
||||
if !ok || len(m) == 0 {
|
||||
return nil
|
||||
|
@ -243,9 +279,7 @@ func (cs *cacheResultStorage) byResultID(resultID string) *solver.Remote {
|
|||
for id := range m {
|
||||
it, ok := cs.byID[id]
|
||||
if ok {
|
||||
if r := it.result; r != nil {
|
||||
return r
|
||||
}
|
||||
return it
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -128,6 +128,20 @@ func (c *item) LinkFrom(rec solver.CacheExporterRecord, index int, selector stri
|
|||
c.links[index][link{src: src, selector: selector}] = struct{}{}
|
||||
}
|
||||
|
||||
func (c *item) walkAllResults(fn func(i *item) error) error {
|
||||
if err := fn(c); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, links := range c.links {
|
||||
for l := range links {
|
||||
if err := l.src.walkAllResults(fn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type nopRecord struct {
|
||||
}
|
||||
|
||||
|
|
|
@ -304,3 +304,15 @@ func marshalItem(it *item, state *marshalState) error {
|
|||
state.records = append(state.records, rec)
|
||||
return nil
|
||||
}
|
||||
|
||||
func isSubRemote(sub, main solver.Remote) bool {
|
||||
if len(sub.Descriptors) > len(main.Descriptors) {
|
||||
return false
|
||||
}
|
||||
for i := range sub.Descriptors {
|
||||
if sub.Descriptors[i].Digest != main.Descriptors[i].Digest {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -144,6 +144,79 @@ func (c *cacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, erro
|
|||
return c.results.Load(ctx, res)
|
||||
}
|
||||
|
||||
type LoadedResult struct {
|
||||
Result Result
|
||||
CacheResult CacheResult
|
||||
CacheKey *CacheKey
|
||||
}
|
||||
|
||||
func (c *cacheManager) filterResults(m map[string]Result, ck *CacheKey) (results []LoadedResult, err error) {
|
||||
id := c.getID(ck)
|
||||
if err := c.backend.WalkResults(id, func(cr CacheResult) error {
|
||||
res, ok := m[id]
|
||||
if ok {
|
||||
results = append(results, LoadedResult{
|
||||
Result: res,
|
||||
CacheKey: ck,
|
||||
CacheResult: cr,
|
||||
})
|
||||
delete(m, id)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
for _, r := range results {
|
||||
r.Result.Release(context.TODO())
|
||||
}
|
||||
}
|
||||
for _, keys := range ck.Deps() {
|
||||
for _, key := range keys {
|
||||
res, err := c.filterResults(m, key.CacheKey.CacheKey)
|
||||
if err != nil {
|
||||
for _, r := range results {
|
||||
r.Result.Release(context.TODO())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
results = append(results, res...)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *cacheManager) LoadWithParents(ctx context.Context, rec *CacheRecord) ([]LoadedResult, error) {
|
||||
lwp, ok := c.results.(interface {
|
||||
LoadWithParents(context.Context, CacheResult) (map[string]Result, error)
|
||||
})
|
||||
if !ok {
|
||||
res, err := c.Load(ctx, rec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []LoadedResult{{Result: res, CacheKey: rec.key, CacheResult: CacheResult{ID: c.getID(rec.key), CreatedAt: rec.CreatedAt}}}, nil
|
||||
}
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
cr, err := c.backend.Load(c.getID(rec.key), rec.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m, err := lwp.LoadWithParents(ctx, cr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
results, err := c.filterResults(m, rec.key)
|
||||
if err != nil {
|
||||
for _, r := range m {
|
||||
r.Release(context.TODO())
|
||||
}
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (c *cacheManager) Save(k *CacheKey, r Result, createdAt time.Time) (*ExportableCacheKey, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
|
|
@ -67,15 +67,27 @@ func (cm *combinedCacheManager) Query(inp []CacheKeyWithSelector, inputIndex Ind
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (cm *combinedCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
|
||||
res, err := rec.cacheManager.Load(ctx, rec)
|
||||
func (cm *combinedCacheManager) Load(ctx context.Context, rec *CacheRecord) (res Result, err error) {
|
||||
results, err := rec.cacheManager.LoadWithParents(ctx, rec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := cm.main.Save(rec.key, res, rec.CreatedAt); err != nil {
|
||||
defer func() {
|
||||
for i, res := range results {
|
||||
if err == nil && i == 0 {
|
||||
continue
|
||||
}
|
||||
res.Result.Release(context.TODO())
|
||||
}
|
||||
}()
|
||||
if rec.cacheManager != cm.main {
|
||||
for _, res := range results {
|
||||
if _, err := cm.main.Save(res.CacheKey, res.Result, res.CacheResult.CreatedAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
return results[0].Result, nil
|
||||
}
|
||||
|
||||
func (cm *combinedCacheManager) Save(key *CacheKey, s Result, createdAt time.Time) (*ExportableCacheKey, error) {
|
||||
|
|
Loading…
Reference in New Issue