llbsolver: update pull for multiple cache keys

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2018-04-24 14:00:58 -07:00
parent 77c2793ebb
commit 9c044db670
11 changed files with 152 additions and 59 deletions

View File

@ -6,14 +6,16 @@ import (
"github.com/moby/buildkit/identity"
solver "github.com/moby/buildkit/solver-next"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func NewCacheKeyStorage(cc *CacheChains, w worker.Worker) (solver.CacheKeyStorage, solver.CacheResultStorage, error) {
storage := &cacheKeyStorage{
byID: map[string]*itemWithOutgoingLinks{},
byItem: map[*item]string{},
byID: map[string]*itemWithOutgoingLinks{},
byItem: map[*item]string{},
byResult: map[string]map[string]struct{}{},
}
for _, it := range cc.items {
@ -70,12 +72,23 @@ func addItemToStorage(k *cacheKeyStorage, it *item) (*itemWithOutgoingLinks, err
}
k.byID[id] = itl
if res := it.result; res != nil {
resultID := remoteID(res)
ids, ok := k.byResult[resultID]
if !ok {
ids = map[string]struct{}{}
k.byResult[resultID] = ids
}
ids[id] = struct{}{}
}
return itl, nil
}
type cacheKeyStorage struct {
byID map[string]*itemWithOutgoingLinks
byItem map[*item]string
byID map[string]*itemWithOutgoingLinks
byItem map[*item]string
byResult map[string]map[string]struct{}
}
type itemWithOutgoingLinks struct {
@ -99,17 +112,20 @@ func (cs *cacheKeyStorage) WalkResults(id string, fn func(solver.CacheResult) er
return nil
}
if res := it.result; res != nil {
return fn(solver.CacheResult{ID: id, CreatedAt: it.resultTime})
return fn(solver.CacheResult{ID: remoteID(res), CreatedAt: it.resultTime})
}
return nil
}
func (cs *cacheKeyStorage) Load(id string, resultID string) (solver.CacheResult, error) {
_, ok := cs.byID[id]
it, ok := cs.byID[id]
if !ok {
return solver.CacheResult{}, nil
}
return solver.CacheResult{ID: id}, nil
if res := it.result; res != nil {
return solver.CacheResult{ID: remoteID(res), CreatedAt: it.resultTime}, nil
}
return solver.CacheResult{}, nil
}
func (cs *cacheKeyStorage) AddResult(id string, res solver.CacheResult) error {
@ -144,6 +160,16 @@ func (cs *cacheKeyStorage) WalkBacklinks(id string, fn func(id string, link solv
return nil
}
func (cs *cacheKeyStorage) WalkIDsByResult(id string, fn func(id string) error) error {
ids := cs.byResult[id]
for id := range ids {
if err := fn(id); err != nil {
return err
}
}
return nil
}
func (cs *cacheKeyStorage) HasLink(id string, link solver.CacheInfoLink, target string) bool {
l := nlink{
dgst: outputKey(link.Digest, int(link.Output)),
@ -203,3 +229,12 @@ func (cs *cacheResultStorage) Exists(id string) bool {
}
return it.result != nil
}
// unique ID per remote. this ID is not stable.
func remoteID(r *solver.Remote) string {
dgstr := digest.Canonical.Digester()
for _, desc := range r.Descriptors {
dgstr.Hash().Write([]byte(desc.Digest))
}
return dgstr.Digest().String()
}

View File

@ -32,7 +32,7 @@ func NewBuildOp(v solver.Vertex, op *pb.Op_Build, b frontend.FrontendLLBBridge,
}, nil
}
func (b *buildOp) CacheMap(ctx context.Context) (*solver.CacheMap, error) {
func (b *buildOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
dt, err := json.Marshal(struct {
Type string
Exec *pb.BuildOp
@ -41,7 +41,7 @@ func (b *buildOp) CacheMap(ctx context.Context) (*solver.CacheMap, error) {
Exec: b.op,
})
if err != nil {
return nil, err
return nil, false, err
}
return &solver.CacheMap{
@ -50,7 +50,7 @@ func (b *buildOp) CacheMap(ctx context.Context) (*solver.CacheMap, error) {
Selector digest.Digest
ComputeDigestFunc solver.ResultBasedCacheFunc
}, len(b.v.Inputs())),
}, nil
}, true, nil
}
func (b *buildOp) Exec(ctx context.Context, inputs []solver.Result) (outputs []solver.Result, retErr error) {

View File

@ -51,7 +51,7 @@ func cloneExecOp(old *pb.ExecOp) pb.ExecOp {
return n
}
func (e *execOp) CacheMap(ctx context.Context) (*solver.CacheMap, error) {
func (e *execOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
op := cloneExecOp(e.op)
for i := range op.Mounts {
op.Mounts[i].Selector = ""
@ -69,7 +69,7 @@ func (e *execOp) CacheMap(ctx context.Context) (*solver.CacheMap, error) {
Arch: runtime.GOARCH,
})
if err != nil {
return nil, err
return nil, false, err
}
cm := &solver.CacheMap{
@ -82,7 +82,7 @@ func (e *execOp) CacheMap(ctx context.Context) (*solver.CacheMap, error) {
deps, err := e.getMountDeps()
if err != nil {
return nil, err
return nil, false, err
}
for i, dep := range deps {
@ -98,7 +98,7 @@ func (e *execOp) CacheMap(ctx context.Context) (*solver.CacheMap, error) {
}
}
return cm, nil
return cm, true, nil
}
func dedupePaths(inp []string) []string {

View File

@ -47,20 +47,20 @@ func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error)
return s.src, nil
}
func (s *sourceOp) CacheMap(ctx context.Context) (*solver.CacheMap, error) {
func (s *sourceOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
src, err := s.instance(ctx)
if err != nil {
return nil, err
return nil, false, err
}
k, err := src.CacheKey(ctx)
k, done, err := src.CacheKey(ctx, index)
if err != nil {
return nil, err
return nil, false, err
}
return &solver.CacheMap{
// TODO: add os/arch
Digest: digest.FromBytes([]byte(sourceCacheType + ":" + k)),
}, nil
}, done, nil
}
func (s *sourceOp) Exec(ctx context.Context, _ []solver.Result) (outputs []solver.Result, err error) {

View File

@ -2,7 +2,9 @@ package containerimage
import (
"context"
"encoding/json"
"fmt"
"runtime"
"sync"
"time"
@ -16,6 +18,7 @@ import (
"github.com/containerd/containerd/remotes/docker/schema1"
"github.com/containerd/containerd/rootfs"
"github.com/containerd/containerd/snapshots"
"github.com/docker/distribution/reference"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/auth"
@ -202,11 +205,51 @@ func (p *puller) resolve(ctx context.Context) error {
return p.resolveErr
}
func (p *puller) CacheKey(ctx context.Context) (string, error) {
func (p *puller) mainManifestKey(ctx context.Context) (digest.Digest, error) {
if err := p.resolve(ctx); err != nil {
return "", err
}
return p.desc.Digest.String(), nil
dt, err := json.Marshal(struct {
Digest digest.Digest
OS string
Arch string
}{
Digest: p.desc.Digest,
OS: runtime.GOOS,
Arch: runtime.GOARCH,
})
if err != nil {
return "", err
}
return digest.FromBytes(dt), nil
}
func (p *puller) CacheKey(ctx context.Context, index int) (string, bool, error) {
if index == 0 || p.desc.Digest == "" {
k, err := p.mainManifestKey(ctx)
if err != nil {
return "", false, err
}
return k.String(), false, nil
}
ref, err := reference.ParseNormalizedNamed(p.src.Reference.String())
if err != nil {
return "", false, err
}
ref, err = reference.WithDigest(ref, p.desc.Digest)
if err != nil {
return "", false, nil
}
_, dt, err := imageutil.Config(ctx, ref.String(), p.resolver, p.is.ContentStore)
if err != nil {
// this happens on schema1 images
k, err := p.mainManifestKey(ctx)
if err != nil {
return "", false, err
}
return k.String(), true, nil
}
return cacheKeyFromConfig(dt).String(), true, nil
}
func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
@ -506,6 +549,20 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
}
}
// cacheKeyFromConfig returns a stable digest from image config. If image config
// is a known oci image we will use chainID of layers.
func cacheKeyFromConfig(dt []byte) digest.Digest {
var img ocispec.Image
err := json.Unmarshal(dt, &img)
if err != nil {
return digest.FromBytes(dt)
}
if img.RootFS.Type != "layers" {
return digest.FromBytes(dt)
}
return identity.ChainID(img.RootFS.DiffIDs)
}
// jobs provides a way of identifying the download keys for a particular task
// encountering during the pull walk.
//

View File

@ -160,7 +160,7 @@ func (gs *gitSource) Resolve(ctx context.Context, id source.Identifier) (source.
}, nil
}
func (gs *gitSourceHandler) CacheKey(ctx context.Context) (string, error) {
func (gs *gitSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
remote := gs.src.Remote
ref := gs.src.Ref
if ref == "" {
@ -171,12 +171,12 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context) (string, error) {
if isCommitSHA(ref) {
gs.cacheKey = ref
return ref, nil
return ref, false, nil
}
gitDir, unmountGitDir, err := gs.mountRemote(ctx, remote)
if err != nil {
return "", err
return "", false, err
}
defer unmountGitDir()
@ -184,20 +184,20 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context) (string, error) {
buf, err := gitWithinDir(ctx, gitDir, "", "ls-remote", "origin", ref)
if err != nil {
return "", errors.Wrapf(err, "failed to fetch remote %s", remote)
return "", false, errors.Wrapf(err, "failed to fetch remote %s", remote)
}
out := buf.String()
idx := strings.Index(out, "\t")
if idx == -1 {
return "", errors.Errorf("failed to find commit SHA from output: %s", string(out))
return "", false, errors.Errorf("failed to find commit SHA from output: %s", string(out))
}
sha := string(out[:idx])
if !isCommitSHA(sha) {
return "", errors.Errorf("invalid commit sha %q", sha)
return "", false, errors.Errorf("invalid commit sha %q", sha)
}
gs.cacheKey = sha
return sha, nil
return sha, true, nil
}
func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRef, retErr error) {
@ -209,7 +209,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
cacheKey := gs.cacheKey
if cacheKey == "" {
var err error
cacheKey, err = gs.CacheKey(ctx)
cacheKey, _, err = gs.CacheKey(ctx, 0)
if err != nil {
return nil, err
}

View File

@ -49,8 +49,9 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
g, err := gs.Resolve(ctx, id)
require.NoError(t, err)
key1, err := g.CacheKey(ctx)
key1, done, err := g.CacheKey(ctx, 0)
require.NoError(t, err)
require.True(t, done)
require.Equal(t, 40, len(key1))
@ -85,7 +86,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
g, err = gs.Resolve(ctx, id)
require.NoError(t, err)
key2, err := g.CacheKey(ctx)
key2, _, err := g.CacheKey(ctx, 0)
require.NoError(t, err)
require.Equal(t, key1, key2)
@ -101,7 +102,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
g, err = gs.Resolve(ctx, id)
require.NoError(t, err)
key3, err := g.CacheKey(ctx)
key3, _, err := g.CacheKey(ctx, 0)
require.NoError(t, err)
require.NotEqual(t, key1, key3)
@ -166,7 +167,7 @@ func testFetchBySHA(t *testing.T, keepGitDir bool) {
g, err := gs.Resolve(ctx, id)
require.NoError(t, err)
key1, err := g.CacheKey(ctx)
key1, _, err := g.CacheKey(ctx, 0)
require.NoError(t, err)
require.Equal(t, 40, len(key1))
@ -242,11 +243,11 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) {
g2, err := gs.Resolve(ctx, id2)
require.NoError(t, err)
key1, err := g.CacheKey(ctx)
key1, _, err := g.CacheKey(ctx, 0)
require.NoError(t, err)
require.Equal(t, 40, len(key1))
key2, err := g2.CacheKey(ctx)
key2, _, err := g2.CacheKey(ctx, 0)
require.NoError(t, err)
require.Equal(t, 40, len(key2))

View File

@ -106,26 +106,26 @@ func (hs *httpSourceHandler) formatCacheKey(filename string, dgst digest.Digest)
return digest.FromBytes(dt)
}
func (hs *httpSourceHandler) CacheKey(ctx context.Context) (string, error) {
func (hs *httpSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
if hs.src.Checksum != "" {
hs.cacheKey = hs.src.Checksum
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, nil), hs.src.Checksum).String(), nil
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, nil), hs.src.Checksum).String(), true, nil
}
uh, err := hs.urlHash()
if err != nil {
return "", nil
return "", false, nil
}
// look up metadata(previously stored headers) for that URL
sis, err := hs.md.Search(uh.String())
if err != nil {
return "", errors.Wrapf(err, "failed to search metadata for %s", uh)
return "", false, errors.Wrapf(err, "failed to search metadata for %s", uh)
}
req, err := http.NewRequest("GET", hs.src.URL, nil)
if err != nil {
return "", err
return "", false, err
}
req = req.WithContext(ctx)
m := map[string]*metadata.StorageItem{}
@ -145,35 +145,35 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context) (string, error) {
resp, err := tracing.DefaultClient.Do(req)
if err != nil {
return "", err
return "", false, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return "", errors.Errorf("invalid response status %d", resp.StatusCode)
return "", false, errors.Errorf("invalid response status %d", resp.StatusCode)
}
if resp.StatusCode == http.StatusNotModified {
respETag := resp.Header.Get("ETag")
si, ok := m[respETag]
if !ok {
return "", errors.Errorf("invalid not-modified ETag: %v", respETag)
return "", false, errors.Errorf("invalid not-modified ETag: %v", respETag)
}
hs.refID = si.ID()
dgst := getChecksum(si)
if dgst == "" {
return "", errors.Errorf("invalid metadata change")
return "", false, errors.Errorf("invalid metadata change")
}
resp.Body.Close()
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst).String(), nil
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst).String(), true, nil
}
ref, dgst, err := hs.save(ctx, resp)
if err != nil {
return "", err
return "", false, err
}
ref.Release(context.TODO())
hs.cacheKey = dgst
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst).String(), nil
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst).String(), true, nil
}
func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response) (ref cache.ImmutableRef, dgst digest.Digest, retErr error) {

View File

@ -43,7 +43,7 @@ func TestHTTPSource(t *testing.T) {
h, err := hs.Resolve(ctx, id)
require.NoError(t, err)
k, err := h.CacheKey(ctx)
k, _, err := h.CacheKey(ctx, 0)
require.NoError(t, err)
expectedContent1 := "sha256:0b1a154faa3003c1fbe7fda9c8a42d55fde2df2a2c405c32038f8ac7ed6b044a"
@ -72,7 +72,7 @@ func TestHTTPSource(t *testing.T) {
h, err = hs.Resolve(ctx, id)
require.NoError(t, err)
k, err = h.CacheKey(ctx)
k, _, err = h.CacheKey(ctx, 0)
require.NoError(t, err)
require.Equal(t, expectedContent1, k)
@ -108,7 +108,7 @@ func TestHTTPSource(t *testing.T) {
h, err = hs.Resolve(ctx, id)
require.NoError(t, err)
k, err = h.CacheKey(ctx)
k, _, err = h.CacheKey(ctx, 0)
require.NoError(t, err)
require.Equal(t, expectedContent2, k)
@ -157,7 +157,7 @@ func TestHTTPDefaultName(t *testing.T) {
h, err := hs.Resolve(ctx, id)
require.NoError(t, err)
k, err := h.CacheKey(ctx)
k, _, err := h.CacheKey(ctx, 0)
require.NoError(t, err)
require.Equal(t, "sha256:146f16ec8810a62a57ce314aba391f95f7eaaf41b8b1ebaf2ab65fd63b1ad437", k)
@ -200,7 +200,7 @@ func TestHTTPInvalidURL(t *testing.T) {
h, err := hs.Resolve(ctx, id)
require.NoError(t, err)
_, err = h.CacheKey(ctx)
_, _, err = h.CacheKey(ctx, 0)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid response")
}
@ -230,7 +230,7 @@ func TestHTTPChecksum(t *testing.T) {
h, err := hs.Resolve(ctx, id)
require.NoError(t, err)
k, err := h.CacheKey(ctx)
k, _, err := h.CacheKey(ctx, 0)
require.NoError(t, err)
expectedContentDifferent := "sha256:f25996f463dca69cffb580f8273ffacdda43332b5f0a8bea2ead33900616d44b"
@ -252,7 +252,7 @@ func TestHTTPChecksum(t *testing.T) {
h, err = hs.Resolve(ctx, id)
require.NoError(t, err)
k, err = h.CacheKey(ctx)
k, _, err = h.CacheKey(ctx, 0)
require.NoError(t, err)
require.Equal(t, expectedContentCorrect, k)

View File

@ -66,13 +66,13 @@ type localSourceHandler struct {
*localSource
}
func (ls *localSourceHandler) CacheKey(ctx context.Context) (string, error) {
func (ls *localSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
sessionID := ls.src.SessionID
if sessionID == "" {
id := session.FromContext(ctx)
if id == "" {
return "", errors.New("could not access local files without session")
return "", false, errors.New("could not access local files without session")
}
sessionID = id
}
@ -82,9 +82,9 @@ func (ls *localSourceHandler) CacheKey(ctx context.Context) (string, error) {
ExcludePatterns []string
}{SessionID: sessionID, IncludePatterns: ls.src.IncludePatterns, ExcludePatterns: ls.src.ExcludePatterns})
if err != nil {
return "", err
return "", false, err
}
return "session:" + ls.src.Name + ":" + digest.FromBytes(dt).String(), nil
return "session:" + ls.src.Name + ":" + digest.FromBytes(dt).String(), true, nil
}
func (ls *localSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRef, retErr error) {

View File

@ -14,7 +14,7 @@ type Source interface {
}
type SourceInstance interface {
CacheKey(ctx context.Context) (string, error)
CacheKey(ctx context.Context, index int) (string, bool, error)
Snapshot(ctx context.Context) (cache.ImmutableRef, error)
}