From e3b05289d823400d999d9eb65b0f8915fca6c32a Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 26 Oct 2020 23:13:39 -0700 Subject: [PATCH 1/2] add session injection to remote loading Signed-off-by: Tonis Tiigi --- cache/blobs.go | 13 +- cache/contenthash/checksum.go | 32 ++--- cache/contenthash/checksum_test.go | 141 +++++++++++----------- cache/manager.go | 7 +- cache/manager_test.go | 26 ++-- cache/opts.go | 3 +- cache/refs.go | 28 +++-- cache/remote.go | 21 ++-- cache/remotecache/v1/cachestorage.go | 3 +- cache/util/fsutil.go | 20 ++- executor/containerdexecutor/executor.go | 3 +- executor/executor.go | 10 +- executor/runcexecutor/executor.go | 2 +- exporter/containerimage/export.go | 12 +- exporter/containerimage/writer.go | 11 +- exporter/local/export.go | 2 +- exporter/oci/export.go | 6 +- exporter/tar/export.go | 2 +- frontend/gateway/container.go | 27 ++++- frontend/gateway/forwarder/forward.go | 27 +++-- frontend/gateway/gateway.go | 26 ++-- solver/cachestorage.go | 3 +- solver/exporter.go | 2 +- solver/jobs.go | 2 +- solver/llbsolver/bridge.go | 3 +- solver/llbsolver/file/refmanager.go | 9 +- solver/llbsolver/mounts/mount.go | 16 +-- solver/llbsolver/ops/build.go | 2 +- solver/llbsolver/ops/exec.go | 22 +++- solver/llbsolver/ops/file.go | 24 ++-- solver/llbsolver/ops/file_test.go | 27 +++-- solver/llbsolver/ops/fileoptypes/types.go | 3 +- solver/llbsolver/result.go | 21 ++-- solver/llbsolver/solver.go | 15 ++- solver/memorycachestorage.go | 3 +- solver/scheduler_test.go | 2 +- solver/types.go | 4 +- source/containerimage/pull.go | 12 +- source/git/gitsource.go | 14 +-- source/git/gitsource_test.go | 10 +- source/http/httpsource.go | 10 +- source/http/httpsource_test.go | 2 +- source/local/local.go | 8 +- util/pull/pull.go | 33 +++-- worker/base/worker.go | 2 +- worker/cacheresult.go | 5 +- worker/runc/runc_test.go | 30 +++-- worker/tests/common.go | 25 +++- 48 files changed, 416 insertions(+), 315 deletions(-) diff --git a/cache/blobs.go b/cache/blobs.go index 12921b0f..d3648b14 100644 --- a/cache/blobs.go +++ b/cache/blobs.go @@ -6,6 +6,7 @@ import ( "github.com/containerd/containerd/diff" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/mount" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/winlayers" @@ -29,7 +30,7 @@ var ErrNoBlobs = errors.Errorf("no blobs for snapshot") // computeBlobChain ensures every ref in a parent chain has an associated blob in the content store. If // a blob is missing and createIfNeeded is true, then the blob will be created, otherwise ErrNoBlobs will // be returned. Caller must hold a lease when calling this function. -func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, compressionType compression.Type) error { +func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, compressionType compression.Type, s session.Group) error { if _, ok := leases.FromContext(ctx); !ok { return errors.Errorf("missing lease requirement for computeBlobChain") } @@ -42,16 +43,16 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo ctx = winlayers.UseWindowsLayerMode(ctx) } - return computeBlobChain(ctx, sr, createIfNeeded, compressionType) + return computeBlobChain(ctx, sr, createIfNeeded, compressionType, s) } -func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type) error { +func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, s session.Group) error { baseCtx := ctx eg, ctx := errgroup.WithContext(ctx) var currentDescr ocispec.Descriptor if sr.parent != nil { eg.Go(func() error { - return computeBlobChain(ctx, sr.parent, createIfNeeded, compressionType) + return computeBlobChain(ctx, sr.parent, createIfNeeded, compressionType, s) }) } eg.Go(func() error { @@ -86,7 +87,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool // reference needs to be committed var lower []mount.Mount if sr.parent != nil { - m, err := sr.parent.Mount(ctx, true) + m, err := sr.parent.Mount(ctx, true, s) if err != nil { return nil, err } @@ -99,7 +100,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool defer release() } } - m, err := sr.Mount(ctx, true) + m, err := sr.Mount(ctx, true, s) if err != nil { return nil, err } diff --git a/cache/contenthash/checksum.go b/cache/contenthash/checksum.go index 6ce565b0..a8335eaa 100644 --- a/cache/contenthash/checksum.go +++ b/cache/contenthash/checksum.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/golang-lru/simplelru" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/locker" digest "github.com/opencontainers/go-digest" @@ -44,12 +45,12 @@ func getDefaultManager() *cacheManager { // header, "/dir" is for contents. For the root node "" (empty string) is the // key for root, "/" for the root header -func Checksum(ctx context.Context, ref cache.ImmutableRef, path string, followLinks bool) (digest.Digest, error) { - return getDefaultManager().Checksum(ctx, ref, path, followLinks) +func Checksum(ctx context.Context, ref cache.ImmutableRef, path string, followLinks bool, s session.Group) (digest.Digest, error) { + return getDefaultManager().Checksum(ctx, ref, path, followLinks, s) } -func ChecksumWildcard(ctx context.Context, ref cache.ImmutableRef, path string, followLinks bool) (digest.Digest, error) { - return getDefaultManager().ChecksumWildcard(ctx, ref, path, followLinks) +func ChecksumWildcard(ctx context.Context, ref cache.ImmutableRef, path string, followLinks bool, s session.Group) (digest.Digest, error) { + return getDefaultManager().ChecksumWildcard(ctx, ref, path, followLinks, s) } func GetCacheContext(ctx context.Context, md *metadata.StorageItem, idmap *idtools.IdentityMapping) (CacheContext, error) { @@ -65,8 +66,8 @@ func ClearCacheContext(md *metadata.StorageItem) { } type CacheContext interface { - Checksum(ctx context.Context, ref cache.Mountable, p string, followLinks bool) (digest.Digest, error) - ChecksumWildcard(ctx context.Context, ref cache.Mountable, p string, followLinks bool) (digest.Digest, error) + Checksum(ctx context.Context, ref cache.Mountable, p string, followLinks bool, s session.Group) (digest.Digest, error) + ChecksumWildcard(ctx context.Context, ref cache.Mountable, p string, followLinks bool, s session.Group) (digest.Digest, error) HandleChange(kind fsutil.ChangeKind, p string, fi os.FileInfo, err error) error } @@ -85,20 +86,20 @@ type cacheManager struct { lruMu sync.Mutex } -func (cm *cacheManager) Checksum(ctx context.Context, ref cache.ImmutableRef, p string, followLinks bool) (digest.Digest, error) { +func (cm *cacheManager) Checksum(ctx context.Context, ref cache.ImmutableRef, p string, followLinks bool, s session.Group) (digest.Digest, error) { cc, err := cm.GetCacheContext(ctx, ensureOriginMetadata(ref.Metadata()), ref.IdentityMapping()) if err != nil { return "", nil } - return cc.Checksum(ctx, ref, p, followLinks) + return cc.Checksum(ctx, ref, p, followLinks, s) } -func (cm *cacheManager) ChecksumWildcard(ctx context.Context, ref cache.ImmutableRef, p string, followLinks bool) (digest.Digest, error) { +func (cm *cacheManager) ChecksumWildcard(ctx context.Context, ref cache.ImmutableRef, p string, followLinks bool, s session.Group) (digest.Digest, error) { cc, err := cm.GetCacheContext(ctx, ensureOriginMetadata(ref.Metadata()), ref.IdentityMapping()) if err != nil { return "", nil } - return cc.ChecksumWildcard(ctx, ref, p, followLinks) + return cc.ChecksumWildcard(ctx, ref, p, followLinks, s) } func (cm *cacheManager) GetCacheContext(ctx context.Context, md *metadata.StorageItem, idmap *idtools.IdentityMapping) (CacheContext, error) { @@ -170,13 +171,14 @@ type mount struct { mountable cache.Mountable mountPath string unmount func() error + session session.Group } func (m *mount) mount(ctx context.Context) (string, error) { if m.mountPath != "" { return m.mountPath, nil } - mounts, err := m.mountable.Mount(ctx, true) + mounts, err := m.mountable.Mount(ctx, true, m.session) if err != nil { return "", err } @@ -380,8 +382,8 @@ func (cc *cacheContext) HandleChange(kind fsutil.ChangeKind, p string, fi os.Fil return nil } -func (cc *cacheContext) ChecksumWildcard(ctx context.Context, mountable cache.Mountable, p string, followLinks bool) (digest.Digest, error) { - m := &mount{mountable: mountable} +func (cc *cacheContext) ChecksumWildcard(ctx context.Context, mountable cache.Mountable, p string, followLinks bool, s session.Group) (digest.Digest, error) { + m := &mount{mountable: mountable, session: s} defer m.clean() wildcards, err := cc.wildcards(ctx, m, p) @@ -417,8 +419,8 @@ func (cc *cacheContext) ChecksumWildcard(ctx context.Context, mountable cache.Mo return wildcards[0].Record.Digest, nil } -func (cc *cacheContext) Checksum(ctx context.Context, mountable cache.Mountable, p string, followLinks bool) (digest.Digest, error) { - m := &mount{mountable: mountable} +func (cc *cacheContext) Checksum(ctx context.Context, mountable cache.Mountable, p string, followLinks bool, s session.Group) (digest.Digest, error) { + m := &mount{mountable: mountable, session: s} defer m.clean() return cc.checksumFollow(ctx, m, p, followLinks) diff --git a/cache/contenthash/checksum_test.go b/cache/contenthash/checksum_test.go index 3a2b844f..a75f5076 100644 --- a/cache/contenthash/checksum_test.go +++ b/cache/contenthash/checksum_test.go @@ -18,6 +18,7 @@ import ( "github.com/containerd/containerd/snapshots/native" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" "github.com/moby/buildkit/util/leaseutil" @@ -59,7 +60,7 @@ func TestChecksumSymlinkNoParentScan(t *testing.T) { cc, err := newCacheContext(ref.Metadata(), nil) require.NoError(t, err) - dgst, err := cc.Checksum(context.TODO(), ref, "aa/ln/bb/cc/dd", true) + dgst, err := cc.Checksum(context.TODO(), ref, "aa/ln/bb/cc/dd", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) } @@ -87,15 +88,15 @@ func TestChecksumHardlinks(t *testing.T) { cc, err := newCacheContext(ref.Metadata(), nil) require.NoError(t, err) - dgst, err := cc.Checksum(context.TODO(), ref, "abc/foo", false) + dgst, err := cc.Checksum(context.TODO(), ref, "abc/foo", false, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = cc.Checksum(context.TODO(), ref, "ln", false) + dgst, err = cc.Checksum(context.TODO(), ref, "ln", false, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = cc.Checksum(context.TODO(), ref, "ln2", false) + dgst, err = cc.Checksum(context.TODO(), ref, "ln2", false, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) @@ -108,15 +109,15 @@ func TestChecksumHardlinks(t *testing.T) { err = emit(cc2.HandleChange, changeStream(ch)) require.NoError(t, err) - dgst, err = cc2.Checksum(context.TODO(), ref, "abc/foo", false) + dgst, err = cc2.Checksum(context.TODO(), ref, "abc/foo", false, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = cc2.Checksum(context.TODO(), ref, "ln", false) + dgst, err = cc2.Checksum(context.TODO(), ref, "ln", false, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = cc2.Checksum(context.TODO(), ref, "ln2", false) + dgst, err = cc2.Checksum(context.TODO(), ref, "ln2", false, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) @@ -134,15 +135,15 @@ func TestChecksumHardlinks(t *testing.T) { data1Expected := "sha256:c2b5e234f5f38fc5864da7def04782f82501a40d46192e4207d5b3f0c3c4732b" - dgst, err = cc2.Checksum(context.TODO(), ref, "abc/foo", false) + dgst, err = cc2.Checksum(context.TODO(), ref, "abc/foo", false, nil) require.NoError(t, err) require.Equal(t, data1Expected, string(dgst)) - dgst, err = cc2.Checksum(context.TODO(), ref, "ln", false) + dgst, err = cc2.Checksum(context.TODO(), ref, "ln", false, nil) require.NoError(t, err) require.Equal(t, data1Expected, string(dgst)) - dgst, err = cc2.Checksum(context.TODO(), ref, "ln2", false) + dgst, err = cc2.Checksum(context.TODO(), ref, "ln2", false, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) } @@ -176,25 +177,25 @@ func TestChecksumWildcard(t *testing.T) { cc, err := newCacheContext(ref.Metadata(), nil) require.NoError(t, err) - dgst, err := cc.ChecksumWildcard(context.TODO(), ref, "f*o", false) + dgst, err := cc.ChecksumWildcard(context.TODO(), ref, "f*o", false, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) expFoos := digest.Digest("sha256:c9f914ad7ad8fe6092ce67484b43ca39c2087aabf9e4a1b223249b0f8b09b9f2") - dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "f*", false) + dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "f*", false, nil) require.NoError(t, err) require.Equal(t, expFoos, dgst) - dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "x/d?", false) + dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "x/d?", false, nil) require.NoError(t, err) require.Equal(t, dgstDirD0, dgst) - dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "x/d?/def", true) + dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "x/d?/def", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "y*", true) + dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "y*", true, nil) require.NoError(t, err) require.Equal(t, expFoos, dgst) @@ -218,7 +219,7 @@ func TestChecksumWildcardWithBadMountable(t *testing.T) { cc, err := newCacheContext(ref.Metadata(), nil) require.NoError(t, err) - _, err = cc.ChecksumWildcard(context.TODO(), newBadMountable(), "*", false) + _, err = cc.ChecksumWildcard(context.TODO(), newBadMountable(), "*", false, nil) require.Error(t, err) } @@ -249,31 +250,31 @@ func TestSymlinksNoFollow(t *testing.T) { expectedSym := digest.Digest("sha256:a2ba571981f48ec34eb79c9a3ab091b6491e825c2f7e9914ea86e8e958be7fae") - dgst, err := cc.ChecksumWildcard(context.TODO(), ref, "sym", false) + dgst, err := cc.ChecksumWildcard(context.TODO(), ref, "sym", false, nil) require.NoError(t, err) require.Equal(t, expectedSym, dgst) - dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "sym2", false) + dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "sym2", false, nil) require.NoError(t, err) require.NotEqual(t, expectedSym, dgst) - dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "foo/ghi", false) + dgst, err = cc.ChecksumWildcard(context.TODO(), ref, "foo/ghi", false, nil) require.NoError(t, err) require.Equal(t, expectedSym, dgst) - _, err = cc.ChecksumWildcard(context.TODO(), ref, "foo/ghi", true) // same because broken symlink + _, err = cc.ChecksumWildcard(context.TODO(), ref, "foo/ghi", true, nil) // same because broken symlink require.Error(t, err) require.Equal(t, true, errors.Is(err, errNotFound)) - _, err = cc.ChecksumWildcard(context.TODO(), ref, "y1", true) + _, err = cc.ChecksumWildcard(context.TODO(), ref, "y1", true, nil) require.Error(t, err) require.Equal(t, true, errors.Is(err, errNotFound)) - dgst, err = cc.Checksum(context.TODO(), ref, "sym", false) + dgst, err = cc.Checksum(context.TODO(), ref, "sym", false, nil) require.NoError(t, err) require.Equal(t, expectedSym, dgst) - dgst, err = cc.Checksum(context.TODO(), ref, "foo/ghi", false) + dgst, err = cc.Checksum(context.TODO(), ref, "foo/ghi", false, nil) require.NoError(t, err) require.Equal(t, expectedSym, dgst) @@ -309,48 +310,48 @@ func TestChecksumBasicFile(t *testing.T) { cc, err := newCacheContext(ref.Metadata(), nil) require.NoError(t, err) - _, err = cc.Checksum(context.TODO(), ref, "nosuch", true) + _, err = cc.Checksum(context.TODO(), ref, "nosuch", true, nil) require.Error(t, err) - dgst, err := cc.Checksum(context.TODO(), ref, "foo", true) + dgst, err := cc.Checksum(context.TODO(), ref, "foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) // second file returns different hash - dgst, err = cc.Checksum(context.TODO(), ref, "bar", true) + dgst, err = cc.Checksum(context.TODO(), ref, "bar", true, nil) require.NoError(t, err) require.Equal(t, digest.Digest("sha256:c2b5e234f5f38fc5864da7def04782f82501a40d46192e4207d5b3f0c3c4732b"), dgst) // same file inside a directory - dgst, err = cc.Checksum(context.TODO(), ref, "d0/abc", true) + dgst, err = cc.Checksum(context.TODO(), ref, "d0/abc", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) // repeat because codepath is different - dgst, err = cc.Checksum(context.TODO(), ref, "d0/abc", true) + dgst, err = cc.Checksum(context.TODO(), ref, "d0/abc", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) // symlink to the same file is followed, returns same hash - dgst, err = cc.Checksum(context.TODO(), ref, "d0/def", true) + dgst, err = cc.Checksum(context.TODO(), ref, "d0/def", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - _, err = cc.Checksum(context.TODO(), ref, "d0/ghi", true) + _, err = cc.Checksum(context.TODO(), ref, "d0/ghi", true, nil) require.Error(t, err) require.Equal(t, true, errors.Is(err, errNotFound)) - dgst, err = cc.Checksum(context.TODO(), ref, "/", true) + dgst, err = cc.Checksum(context.TODO(), ref, "/", true, nil) require.NoError(t, err) require.Equal(t, digest.Digest("sha256:427c9cf9ae98c0f81fb57a3076b965c7c149b6b0a85625ad4e884236649a42c6"), dgst) - dgst, err = cc.Checksum(context.TODO(), ref, "d0", true) + dgst, err = cc.Checksum(context.TODO(), ref, "d0", true, nil) require.NoError(t, err) require.Equal(t, dgstDirD0, dgst) @@ -370,7 +371,7 @@ func TestChecksumBasicFile(t *testing.T) { cc, err = newCacheContext(ref.Metadata(), nil) require.NoError(t, err) - dgst, err = cc.Checksum(context.TODO(), ref, "/", true) + dgst, err = cc.Checksum(context.TODO(), ref, "/", true, nil) require.NoError(t, err) require.Equal(t, dgstDirD0, dgst) @@ -389,7 +390,7 @@ func TestChecksumBasicFile(t *testing.T) { cc, err = newCacheContext(ref.Metadata(), nil) require.NoError(t, err) - dgst, err = cc.Checksum(context.TODO(), ref, "/", true) + dgst, err = cc.Checksum(context.TODO(), ref, "/", true, nil) require.NoError(t, err) require.Equal(t, dgstDirD0Modified, dgst) @@ -415,14 +416,14 @@ func TestChecksumBasicFile(t *testing.T) { cc, err = newCacheContext(ref.Metadata(), nil) require.NoError(t, err) - dgst, err = cc.Checksum(context.TODO(), ref, "abc/aa/foo", true) + dgst, err = cc.Checksum(context.TODO(), ref, "abc/aa/foo", true, nil) require.NoError(t, err) require.Equal(t, digest.Digest("sha256:1c67653c3cf95b12a0014e2c4cd1d776b474b3218aee54155d6ae27b9b999c54"), dgst) require.NotEqual(t, dgstDirD0, dgst) // this will force rescan - dgst, err = cc.Checksum(context.TODO(), ref, "d0", true) + dgst, err = cc.Checksum(context.TODO(), ref, "d0", true, nil) require.NoError(t, err) require.Equal(t, dgstDirD0, dgst) @@ -462,19 +463,19 @@ func TestHandleChange(t *testing.T) { err = emit(cc.HandleChange, changeStream(ch)) require.NoError(t, err) - dgstFoo, err := cc.Checksum(context.TODO(), ref, "foo", true) + dgstFoo, err := cc.Checksum(context.TODO(), ref, "foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgstFoo) // symlink to the same file is followed, returns same hash - dgst, err := cc.Checksum(context.TODO(), ref, "d0/def", true) + dgst, err := cc.Checksum(context.TODO(), ref, "d0/def", true, nil) require.NoError(t, err) require.Equal(t, dgstFoo, dgst) // symlink to the same file is followed, returns same hash - dgst, err = cc.Checksum(context.TODO(), ref, "d0", true) + dgst, err = cc.Checksum(context.TODO(), ref, "d0", true, nil) require.NoError(t, err) require.Equal(t, dgstDirD0, dgst) @@ -486,7 +487,7 @@ func TestHandleChange(t *testing.T) { err = emit(cc.HandleChange, changeStream(ch)) require.NoError(t, err) - dgst, err = cc.Checksum(context.TODO(), ref, "d0", true) + dgst, err = cc.Checksum(context.TODO(), ref, "d0", true, nil) require.NoError(t, err) require.Equal(t, dgstDirD0Modified, dgst) @@ -497,11 +498,11 @@ func TestHandleChange(t *testing.T) { err = emit(cc.HandleChange, changeStream(ch)) require.NoError(t, err) - _, err = cc.Checksum(context.TODO(), ref, "d0", true) + _, err = cc.Checksum(context.TODO(), ref, "d0", true, nil) require.Error(t, err) require.Equal(t, true, errors.Is(err, errNotFound)) - _, err = cc.Checksum(context.TODO(), ref, "d0/abc", true) + _, err = cc.Checksum(context.TODO(), ref, "d0/abc", true, nil) require.Error(t, err) require.Equal(t, true, errors.Is(err, errNotFound)) @@ -538,7 +539,7 @@ func TestHandleRecursiveDir(t *testing.T) { err = emit(cc.HandleChange, changeStream(ch)) require.NoError(t, err) - dgst, err := cc.Checksum(context.TODO(), ref, "d0/foo/bar", true) + dgst, err := cc.Checksum(context.TODO(), ref, "d0/foo/bar", true, nil) require.NoError(t, err) ch = []string{ @@ -550,11 +551,11 @@ func TestHandleRecursiveDir(t *testing.T) { err = emit(cc.HandleChange, changeStream(ch)) require.NoError(t, err) - dgst2, err := cc.Checksum(context.TODO(), ref, "d1", true) + dgst2, err := cc.Checksum(context.TODO(), ref, "d1", true, nil) require.NoError(t, err) require.Equal(t, dgst2, dgst) - _, err = cc.Checksum(context.TODO(), ref, "", true) + _, err = cc.Checksum(context.TODO(), ref, "", true, nil) require.NoError(t, err) } @@ -585,7 +586,7 @@ func TestChecksumUnorderedFiles(t *testing.T) { err = emit(cc.HandleChange, changeStream(ch)) require.NoError(t, err) - dgst, err := cc.Checksum(context.TODO(), ref, "d0", true) + dgst, err := cc.Checksum(context.TODO(), ref, "d0", true, nil) require.NoError(t, err) require.Equal(t, dgst, digest.Digest("sha256:14276c302c940a80f82ca5477bf766c98a24702d6a9948ee71bb277cdad3ae05")) @@ -605,7 +606,7 @@ func TestChecksumUnorderedFiles(t *testing.T) { err = emit(cc.HandleChange, changeStream(ch)) require.NoError(t, err) - dgst2, err := cc.Checksum(context.TODO(), ref, "d0", true) + dgst2, err := cc.Checksum(context.TODO(), ref, "d0", true, nil) require.NoError(t, err) require.NotEqual(t, dgst, dgst2) @@ -630,11 +631,11 @@ func TestSymlinkInPathScan(t *testing.T) { } ref := createRef(t, cm, ch) - dgst, err := Checksum(context.TODO(), ref, "d0/def/foo", true) + dgst, err := Checksum(context.TODO(), ref, "d0/def/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = Checksum(context.TODO(), ref, "d0/def/foo", true) + dgst, err = Checksum(context.TODO(), ref, "d0/def/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) @@ -664,10 +665,10 @@ func TestSymlinkNeedsScan(t *testing.T) { ref := createRef(t, cm, ch) // scan the d0 path containing the symlink that doesn't get followed - _, err = Checksum(context.TODO(), ref, "d0/d1", true) + _, err = Checksum(context.TODO(), ref, "d0/d1", true, nil) require.NoError(t, err) - dgst, err := Checksum(context.TODO(), ref, "d0/d1/def/foo", true) + dgst, err := Checksum(context.TODO(), ref, "d0/d1/def/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) @@ -694,7 +695,7 @@ func TestSymlinkAbsDirSuffix(t *testing.T) { } ref := createRef(t, cm, ch) - dgst, err := Checksum(context.TODO(), ref, "link/foo", true) + dgst, err := Checksum(context.TODO(), ref, "link/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) @@ -729,27 +730,27 @@ func TestSymlinkThroughParent(t *testing.T) { } ref := createRef(t, cm, ch) - dgst, err := Checksum(context.TODO(), ref, "link1/sub/foo", true) + dgst, err := Checksum(context.TODO(), ref, "link1/sub/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = Checksum(context.TODO(), ref, "link2/sub/foo", true) + dgst, err = Checksum(context.TODO(), ref, "link2/sub/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = Checksum(context.TODO(), ref, "link3/sub/foo", true) + dgst, err = Checksum(context.TODO(), ref, "link3/sub/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = Checksum(context.TODO(), ref, "link4/sub/foo", true) + dgst, err = Checksum(context.TODO(), ref, "link4/sub/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = Checksum(context.TODO(), ref, "link5/sub/foo", true) + dgst, err = Checksum(context.TODO(), ref, "link5/sub/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = Checksum(context.TODO(), ref, "link1/sub/link/sub/foo", true) + dgst, err = Checksum(context.TODO(), ref, "link1/sub/link/sub/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) @@ -792,27 +793,27 @@ func TestSymlinkInPathHandleChange(t *testing.T) { err = emit(cc.HandleChange, changeStream(ch)) require.NoError(t, err) - dgst, err := cc.Checksum(context.TODO(), ref, "d1/def/foo", true) + dgst, err := cc.Checksum(context.TODO(), ref, "d1/def/foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgst, err = cc.Checksum(context.TODO(), ref, "d1/def/bar/abc", true) + dgst, err = cc.Checksum(context.TODO(), ref, "d1/def/bar/abc", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) - dgstFileData0, err := cc.Checksum(context.TODO(), ref, "sub/d0", true) + dgstFileData0, err := cc.Checksum(context.TODO(), ref, "sub/d0", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgstDirD0) - dgstFileData0, err = cc.Checksum(context.TODO(), ref, "d1/def/baz", true) + dgstFileData0, err = cc.Checksum(context.TODO(), ref, "d1/def/baz", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgstDirD0) - dgstFileData0, err = cc.Checksum(context.TODO(), ref, "d1/def/bay", true) + dgstFileData0, err = cc.Checksum(context.TODO(), ref, "d1/def/bay", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgstDirD0) - dgstFileData0, err = cc.Checksum(context.TODO(), ref, "link", true) + dgstFileData0, err = cc.Checksum(context.TODO(), ref, "link", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgstDirD0) @@ -843,7 +844,7 @@ func TestPersistence(t *testing.T) { ref := createRef(t, cm, ch) id := ref.ID() - dgst, err := Checksum(context.TODO(), ref, "foo", true) + dgst, err := Checksum(context.TODO(), ref, "foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) @@ -853,7 +854,7 @@ func TestPersistence(t *testing.T) { ref, err = cm.Get(context.TODO(), id) require.NoError(t, err) - dgst, err = Checksum(context.TODO(), ref, "foo", true) + dgst, err = Checksum(context.TODO(), ref, "foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) @@ -873,7 +874,7 @@ func TestPersistence(t *testing.T) { ref, err = cm.Get(context.TODO(), id) require.NoError(t, err) - dgst, err = Checksum(context.TODO(), ref, "foo", true) + dgst, err = Checksum(context.TODO(), ref, "foo", true, nil) require.NoError(t, err) require.Equal(t, dgstFileData0, dgst) } @@ -884,10 +885,10 @@ func createRef(t *testing.T, cm cache.Manager, files []string) cache.ImmutableRe t.Skip("Depends on unimplemented containerd bind-mount support on Windows") } - mref, err := cm.New(context.TODO(), nil, cache.CachePolicyRetain) + mref, err := cm.New(context.TODO(), nil, nil, cache.CachePolicyRetain) require.NoError(t, err) - mounts, err := mref.Mount(context.TODO(), false) + mounts, err := mref.Mount(context.TODO(), false, nil) require.NoError(t, err) lm := snapshot.LocalMounter(mounts) @@ -935,7 +936,7 @@ func setupCacheManager(t *testing.T, tmpdir string, snapshotterName string, snap type badMountable struct{} -func (bm *badMountable) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { +func (bm *badMountable) Mount(ctx context.Context, readonly bool, _ session.Group) (snapshot.Mountable, error) { return nil, errors.New("tried to mount bad mountable") } diff --git a/cache/manager.go b/cache/manager.go index adc3b69b..2689804c 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -16,6 +16,7 @@ import ( "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/util/flightcontrol" digest "github.com/opencontainers/go-digest" @@ -47,7 +48,7 @@ type Accessor interface { GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error) Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) - New(ctx context.Context, parent ImmutableRef, opts ...RefOption) (MutableRef, error) + New(ctx context.Context, parent ImmutableRef, s session.Group, opts ...RefOption) (MutableRef, error) GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) // Rebase? IdentityMapping() *idtools.IdentityMapping Metadata(string) *metadata.StorageItem @@ -452,7 +453,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt return rec, nil } -func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOption) (mr MutableRef, err error) { +func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, sess session.Group, opts ...RefOption) (mr MutableRef, err error) { id := identity.NewID() var parent *immutableRef @@ -471,7 +472,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti if err := parent.Finalize(ctx, true); err != nil { return nil, err } - if err := parent.Extract(ctx); err != nil { + if err := parent.Extract(ctx, sess); err != nil { return nil, err } parentSnapshotID = getSnapshotID(parent.md) diff --git a/cache/manager_test.go b/cache/manager_test.go index 050b568b..ce4d6455 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -162,10 +162,10 @@ func TestManager(t *testing.T) { checkDiskUsage(ctx, t, cm, 0, 0) - active, err := cm.New(ctx, nil, CachePolicyRetain) + active, err := cm.New(ctx, nil, nil, CachePolicyRetain) require.NoError(t, err) - m, err := active.Mount(ctx, false) + m, err := active.Mount(ctx, false, nil) require.NoError(t, err) lm := snapshot.LocalMounter(m) @@ -234,7 +234,7 @@ func TestManager(t *testing.T) { err = snap.Release(ctx) require.NoError(t, err) - active2, err := cm.New(ctx, snap2, CachePolicyRetain) + active2, err := cm.New(ctx, snap2, nil, CachePolicyRetain) require.NoError(t, err) checkDiskUsage(ctx, t, cm, 2, 0) @@ -326,7 +326,7 @@ func TestSnapshotExtract(t *testing.T) { checkNumBlobs(ctx, t, co.cs, 2) - err = snap2.Extract(ctx) + err = snap2.Extract(ctx, nil) require.NoError(t, err) require.Equal(t, true, snap.Info().Extracted) @@ -434,7 +434,7 @@ func TestExtractOnMutable(t *testing.T) { cm := co.manager - active, err := cm.New(ctx, nil) + active, err := cm.New(ctx, nil, nil) require.NoError(t, err) snap, err := active.Commit(ctx) @@ -480,7 +480,7 @@ func TestExtractOnMutable(t *testing.T) { checkNumBlobs(ctx, t, co.cs, 2) - err = snap2.Extract(ctx) + err = snap2.Extract(ctx, nil) require.NoError(t, err) require.Equal(t, true, snap.Info().Extracted) @@ -545,7 +545,7 @@ func TestSetBlob(t *testing.T) { cm := co.manager - active, err := cm.New(ctx, nil) + active, err := cm.New(ctx, nil, nil) require.NoError(t, err) snap, err := active.Commit(ctx) @@ -588,7 +588,7 @@ func TestSetBlob(t *testing.T) { require.Equal(t, snap.ID(), info.SnapshotID) require.Equal(t, info.Extracted, true) - active, err = cm.New(ctx, snap) + active, err = cm.New(ctx, snap, nil) require.NoError(t, err) snap2, err := active.Commit(ctx) @@ -708,13 +708,13 @@ func TestPrune(t *testing.T) { defer cleanup() cm := co.manager - active, err := cm.New(ctx, nil) + active, err := cm.New(ctx, nil, nil) require.NoError(t, err) snap, err := active.Commit(ctx) require.NoError(t, err) - active, err = cm.New(ctx, snap, CachePolicyRetain) + active, err = cm.New(ctx, snap, nil, CachePolicyRetain) require.NoError(t, err) snap2, err := active.Commit(ctx) @@ -760,7 +760,7 @@ func TestPrune(t *testing.T) { err = snap.Release(ctx) require.NoError(t, err) - active, err = cm.New(ctx, snap, CachePolicyRetain) + active, err = cm.New(ctx, snap, nil, CachePolicyRetain) require.NoError(t, err) snap2, err = active.Commit(ctx) @@ -818,7 +818,7 @@ func TestLazyCommit(t *testing.T) { require.NoError(t, err) cm := co.manager - active, err := cm.New(ctx, nil, CachePolicyRetain) + active, err := cm.New(ctx, nil, nil, CachePolicyRetain) require.NoError(t, err) // after commit mutable is locked @@ -887,7 +887,7 @@ func TestLazyCommit(t *testing.T) { require.NoError(t, err) // test restarting after commit - active, err = cm.New(ctx, nil, CachePolicyRetain) + active, err = cm.New(ctx, nil, nil, CachePolicyRetain) require.NoError(t, err) // after commit mutable is locked diff --git a/cache/opts.go b/cache/opts.go index 6b7025b0..911def3e 100644 --- a/cache/opts.go +++ b/cache/opts.go @@ -4,12 +4,13 @@ import ( "fmt" "github.com/containerd/containerd/content" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/util/progress" digest "github.com/opencontainers/go-digest" ) type DescHandler struct { - Provider content.Provider + Provider func(session.Group) content.Provider Progress progress.Controller SnapshotLabels map[string]string } diff --git a/cache/refs.go b/cache/refs.go index 1da8beda..bbe95adb 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/compression" @@ -44,8 +45,8 @@ type ImmutableRef interface { Clone() ImmutableRef Info() RefInfo - Extract(ctx context.Context) error // +progress - GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type) (*solver.Remote, error) + Extract(ctx context.Context, s session.Group) error // +progress + GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, s session.Group) (*solver.Remote, error) } type RefInfo struct { @@ -64,7 +65,7 @@ type MutableRef interface { } type Mountable interface { - Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) + Mount(ctx context.Context, readonly bool, s session.Group) (snapshot.Mountable, error) } type ref interface { @@ -368,9 +369,9 @@ func (sr *immutableRef) parentRefChain() []*immutableRef { return refs } -func (sr *immutableRef) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { +func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (snapshot.Mountable, error) { if getBlobOnly(sr.md) { - if err := sr.Extract(ctx); err != nil { + if err := sr.Extract(ctx, s); err != nil { return nil, err } } @@ -380,7 +381,7 @@ func (sr *immutableRef) Mount(ctx context.Context, readonly bool) (snapshot.Moun return sr.mount(ctx, readonly) } -func (sr *immutableRef) Extract(ctx context.Context) (rerr error) { +func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr error) { ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary) if err != nil { return err @@ -395,7 +396,7 @@ func (sr *immutableRef) Extract(ctx context.Context) (rerr error) { return err } - return sr.extract(ctx, sr.descHandlers) + return sr.extract(ctx, sr.descHandlers, s) } func (sr *immutableRef) prepareRemoteSnapshots(ctx context.Context, dhs DescHandlers) (bool, error) { @@ -449,7 +450,7 @@ func (sr *immutableRef) prepareRemoteSnapshots(ctx context.Context, dhs DescHand return ok.(bool), err } -func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers) error { +func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session.Group) error { _, err := sr.sizeG.Do(ctx, sr.ID()+"-extract", func(ctx context.Context) (_ interface{}, rerr error) { snapshotID := getSnapshotID(sr.md) if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil { @@ -461,7 +462,7 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers) error { parentID := "" if sr.parent != nil { eg.Go(func() error { - if err := sr.parent.extract(egctx, dhs); err != nil { + if err := sr.parent.extract(egctx, dhs, s); err != nil { return err } parentID = getSnapshotID(sr.parent.md) @@ -478,9 +479,10 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers) error { eg.Go(func() error { // unlazies if needed, otherwise a no-op return lazyRefProvider{ - ref: sr, - desc: desc, - dh: dh, + ref: sr, + desc: desc, + dh: dh, + session: s, }.Unlazy(egctx) }) @@ -703,7 +705,7 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) { return ref, nil } -func (sr *mutableRef) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { +func (sr *mutableRef) Mount(ctx context.Context, readonly bool, s session.Group) (snapshot.Mountable, error) { sr.mu.Lock() defer sr.mu.Unlock() diff --git a/cache/remote.go b/cache/remote.go index 29eda2fc..871f80eb 100644 --- a/cache/remote.go +++ b/cache/remote.go @@ -9,6 +9,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/reference" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/contentutil" @@ -23,14 +24,14 @@ type Unlazier interface { Unlazy(ctx context.Context) error } -func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type) (*solver.Remote, error) { +func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, s session.Group) (*solver.Remote, error) { ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary) if err != nil { return nil, err } defer done(ctx) - err = sr.computeBlobChain(ctx, createIfNeeded, compressionType) + err = sr.computeBlobChain(ctx, createIfNeeded, compressionType, s) if err != nil { return nil, err } @@ -102,9 +103,10 @@ func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, comp remote.Descriptors = append(remote.Descriptors, desc) mprovider.Add(lazyRefProvider{ - ref: ref, - desc: desc, - dh: sr.descHandlers[desc.Digest], + ref: ref, + desc: desc, + dh: sr.descHandlers[desc.Digest], + session: s, }) } return remote, nil @@ -136,9 +138,10 @@ func (mp *lazyMultiProvider) Unlazy(ctx context.Context) error { } type lazyRefProvider struct { - ref *immutableRef - desc ocispec.Descriptor - dh *DescHandler + ref *immutableRef + desc ocispec.Descriptor + dh *DescHandler + session session.Group } func (p lazyRefProvider) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { @@ -175,7 +178,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { // store. If efficient partial reads are desired in the future, something more like a "tee" // that caches remote partial reads to a local store may need to replace this. err := contentutil.Copy(ctx, p.ref.cm.ContentStore, &pullprogress.ProviderWithProgress{ - Provider: p.dh.Provider, + Provider: p.dh.Provider(p.session), Manager: p.ref.cm.ContentStore, }, p.desc) if err != nil { diff --git a/cache/remotecache/v1/cachestorage.go b/cache/remotecache/v1/cachestorage.go index 728b85a2..dee02f4b 100644 --- a/cache/remotecache/v1/cachestorage.go +++ b/cache/remotecache/v1/cachestorage.go @@ -5,6 +5,7 @@ import ( "time" "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" @@ -260,7 +261,7 @@ func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult) return worker.NewWorkerRefResult(ref, cs.w), nil } -func (cs *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult) (*solver.Remote, error) { +func (cs *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult, _ session.Group) (*solver.Remote, error) { if r := cs.byResultID(res.ID); r != nil && r.result != nil { return r.result, nil } diff --git a/cache/util/fsutil.go b/cache/util/fsutil.go index 41e5465f..b425a002 100644 --- a/cache/util/fsutil.go +++ b/cache/util/fsutil.go @@ -8,7 +8,6 @@ import ( "path/filepath" "github.com/containerd/continuity/fs" - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/snapshot" "github.com/pkg/errors" "github.com/tonistiigi/fsutil" @@ -25,12 +24,7 @@ type FileRange struct { Length int } -func withMount(ctx context.Context, ref cache.ImmutableRef, cb func(string) error) error { - mount, err := ref.Mount(ctx, true) - if err != nil { - return err - } - +func withMount(ctx context.Context, mount snapshot.Mountable, cb func(string) error) error { lm := snapshot.LocalMounter(mount) root, err := lm.Mount() @@ -55,10 +49,10 @@ func withMount(ctx context.Context, ref cache.ImmutableRef, cb func(string) erro return nil } -func ReadFile(ctx context.Context, ref cache.ImmutableRef, req ReadRequest) ([]byte, error) { +func ReadFile(ctx context.Context, mount snapshot.Mountable, req ReadRequest) ([]byte, error) { var dt []byte - err := withMount(ctx, ref, func(root string) error { + err := withMount(ctx, mount, func(root string) error { fp, err := fs.RootPath(root, req.Filename) if err != nil { return errors.WithStack(err) @@ -90,7 +84,7 @@ type ReadDirRequest struct { IncludePattern string } -func ReadDir(ctx context.Context, ref cache.ImmutableRef, req ReadDirRequest) ([]*fstypes.Stat, error) { +func ReadDir(ctx context.Context, mount snapshot.Mountable, req ReadDirRequest) ([]*fstypes.Stat, error) { var ( rd []*fstypes.Stat wo fsutil.WalkOpt @@ -98,7 +92,7 @@ func ReadDir(ctx context.Context, ref cache.ImmutableRef, req ReadDirRequest) ([ if req.IncludePattern != "" { wo.IncludePatterns = append(wo.IncludePatterns, req.IncludePattern) } - err := withMount(ctx, ref, func(root string) error { + err := withMount(ctx, mount, func(root string) error { fp, err := fs.RootPath(root, req.Path) if err != nil { return errors.WithStack(err) @@ -123,9 +117,9 @@ func ReadDir(ctx context.Context, ref cache.ImmutableRef, req ReadDirRequest) ([ return rd, err } -func StatFile(ctx context.Context, ref cache.ImmutableRef, path string) (*fstypes.Stat, error) { +func StatFile(ctx context.Context, mount snapshot.Mountable, path string) (*fstypes.Stat, error) { var st *fstypes.Stat - err := withMount(ctx, ref, func(root string) error { + err := withMount(ctx, mount, func(root string) error { fp, err := fs.RootPath(root, path) if err != nil { return errors.WithStack(err) diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 14d97a17..1e543480 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -17,7 +17,6 @@ import ( containerdoci "github.com/containerd/containerd/oci" "github.com/containerd/continuity/fs" "github.com/docker/docker/pkg/idtools" - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/executor/oci" "github.com/moby/buildkit/identity" @@ -56,7 +55,7 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb } } -func (w *containerdExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { if id == "" { id = identity.NewID() } diff --git a/executor/executor.go b/executor/executor.go index 112bda27..7b047310 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -5,7 +5,7 @@ import ( "io" "net" - "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver/pb" ) @@ -22,8 +22,12 @@ type Meta struct { SecurityMode pb.SecurityMode } +type Mountable interface { + Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) +} + type Mount struct { - Src cache.Mountable + Src Mountable Selector string Dest string Readonly bool @@ -45,7 +49,7 @@ type Executor interface { // Run will start a container for the given process with rootfs, mounts. // `id` is an optional name for the container so it can be referenced later via Exec. // `started` is an optional channel that will be closed when the container setup completes and has started running. - Run(ctx context.Context, id string, rootfs cache.Mountable, mounts []Mount, process ProcessInfo, started chan<- struct{}) error + Run(ctx context.Context, id string, rootfs Mountable, mounts []Mount, process ProcessInfo, started chan<- struct{}) error // Exec will start a process in container matching `id`. An error will be returned // if the container failed to start (via Run) or has exited before Exec is called. Exec(ctx context.Context, id string, process ProcessInfo) error diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index c243bc26..7c9a4883 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -130,7 +130,7 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex return w, nil } -func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { meta := process.Meta startedOnce := sync.Once{} diff --git a/exporter/containerimage/export.go b/exporter/containerimage/export.go index 5dd83785..b02fd5d7 100644 --- a/exporter/containerimage/export.go +++ b/exporter/containerimage/export.go @@ -184,7 +184,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, } defer done(context.TODO()) - desc, err := e.opt.ImageWriter.Commit(ctx, src, e.ociTypes, e.layerCompression) + desc, err := e.opt.ImageWriter.Commit(ctx, src, e.ociTypes, e.layerCompression, sessionID) if err != nil { return nil, err } @@ -233,7 +233,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, tagDone(nil) if e.unpack { - if err := e.unpackImage(ctx, img, src); err != nil { + if err := e.unpackImage(ctx, img, src, session.NewGroup(sessionID)); err != nil { return nil, err } } @@ -242,7 +242,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, annotations := map[digest.Digest]map[string]string{} mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore()) if src.Ref != nil { - remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression) + remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -253,7 +253,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, } if len(src.Refs) > 0 { for _, r := range src.Refs { - remote, err := r.GetRemote(ctx, false, e.layerCompression) + remote, err := r.GetRemote(ctx, false, e.layerCompression, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -276,7 +276,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, return resp, nil } -func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Image, src exporter.Source) (err0 error) { +func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Image, src exporter.Source, s session.Group) (err0 error) { unpackDone := oneOffProgress(ctx, "unpacking to "+img.Name) defer func() { unpackDone(err0) @@ -303,7 +303,7 @@ func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Imag } } - remote, err := topLayerRef.GetRemote(ctx, true, e.layerCompression) + remote, err := topLayerRef.GetRemote(ctx, true, e.layerCompression, s) if err != nil { return err } diff --git a/exporter/containerimage/writer.go b/exporter/containerimage/writer.go index bfee4d15..60b7915e 100644 --- a/exporter/containerimage/writer.go +++ b/exporter/containerimage/writer.go @@ -15,6 +15,7 @@ import ( "github.com/moby/buildkit/cache" "github.com/moby/buildkit/exporter" "github.com/moby/buildkit/exporter/containerimage/exptypes" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/compression" @@ -43,7 +44,7 @@ type ImageWriter struct { opt WriterOpt } -func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool, compressionType compression.Type) (*ocispec.Descriptor, error) { +func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool, compressionType compression.Type, sessionID string) (*ocispec.Descriptor, error) { platformsBytes, ok := inp.Metadata[exptypes.ExporterPlatformsKey] if len(inp.Refs) > 0 && !ok { @@ -51,7 +52,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool } if len(inp.Refs) == 0 { - remotes, err := ic.exportLayers(ctx, compressionType, inp.Ref) + remotes, err := ic.exportLayers(ctx, compressionType, session.NewGroup(sessionID), inp.Ref) if err != nil { return nil, err } @@ -74,7 +75,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool refs = append(refs, r) } - remotes, err := ic.exportLayers(ctx, compressionType, refs...) + remotes, err := ic.exportLayers(ctx, compressionType, session.NewGroup(sessionID), refs...) if err != nil { return nil, err } @@ -139,7 +140,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool return &idxDesc, nil } -func (ic *ImageWriter) exportLayers(ctx context.Context, compressionType compression.Type, refs ...cache.ImmutableRef) ([]solver.Remote, error) { +func (ic *ImageWriter) exportLayers(ctx context.Context, compressionType compression.Type, s session.Group, refs ...cache.ImmutableRef) ([]solver.Remote, error) { eg, ctx := errgroup.WithContext(ctx) layersDone := oneOffProgress(ctx, "exporting layers") @@ -151,7 +152,7 @@ func (ic *ImageWriter) exportLayers(ctx context.Context, compressionType compres return } eg.Go(func() error { - remote, err := ref.GetRemote(ctx, true, compressionType) + remote, err := ref.GetRemote(ctx, true, compressionType, s) if err != nil { return err } diff --git a/exporter/local/export.go b/exporter/local/export.go index c50100e9..d772776a 100644 --- a/exporter/local/export.go +++ b/exporter/local/export.go @@ -70,7 +70,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, } defer os.RemoveAll(src) } else { - mount, err := ref.Mount(ctx, true) + mount, err := ref.Mount(ctx, true, session.NewGroup(sessionID)) if err != nil { return err } diff --git a/exporter/oci/export.go b/exporter/oci/export.go index bd5387e5..38dd99df 100644 --- a/exporter/oci/export.go +++ b/exporter/oci/export.go @@ -125,7 +125,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, } defer done(context.TODO()) - desc, err := e.opt.ImageWriter.Commit(ctx, src, e.ociTypes, e.layerCompression) + desc, err := e.opt.ImageWriter.Commit(ctx, src, e.ociTypes, e.layerCompression, sessionID) if err != nil { return nil, err } @@ -177,7 +177,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore()) if src.Ref != nil { - remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression) + remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -194,7 +194,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, } if len(src.Refs) > 0 { for _, r := range src.Refs { - remote, err := r.GetRemote(ctx, false, e.layerCompression) + remote, err := r.GetRemote(ctx, false, e.layerCompression, session.NewGroup(sessionID)) if err != nil { return nil, err } diff --git a/exporter/tar/export.go b/exporter/tar/export.go index 77e3f688..79e98cd6 100644 --- a/exporter/tar/export.go +++ b/exporter/tar/export.go @@ -65,7 +65,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, } defers = append(defers, func() { os.RemoveAll(src) }) } else { - mount, err := ref.Mount(ctx, true) + mount, err := ref.Mount(ctx, true, session.NewGroup(sessionID)) if err != nil { return nil, err } diff --git a/frontend/gateway/container.go b/frontend/gateway/container.go index 83127963..116a1b28 100644 --- a/frontend/gateway/container.go +++ b/frontend/gateway/container.go @@ -12,6 +12,7 @@ import ( "github.com/moby/buildkit/executor" "github.com/moby/buildkit/frontend/gateway/client" "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver/llbsolver/mounts" opspb "github.com/moby/buildkit/solver/pb" @@ -78,7 +79,7 @@ func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager, } makeMutable := func(worker worker.Worker, ref cache.ImmutableRef) (cache.MutableRef, error) { - mRef, err := worker.CacheManager().New(ctx, ref) + mRef, err := worker.CacheManager().New(ctx, ref, g) if err != nil { return nil, stack.Enable(err) } @@ -105,12 +106,13 @@ func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager, name := fmt.Sprintf("container %s", req.ContainerID) mm = mounts.NewMountManager(name, workerRef.Worker.CacheManager(), sm, workerRef.Worker.MetadataStore()) - ctr.rootFS = workerRef.ImmutableRef + ctr.rootFS = mountWithSession(workerRef.ImmutableRef, g) if !m.Readonly { - ctr.rootFS, err = makeMutable(workerRef.Worker, workerRef.ImmutableRef) + ref, err := makeMutable(workerRef.Worker, workerRef.ImmutableRef) if err != nil { return nil, stack.Enable(err) } + ctr.rootFS = mountWithSession(ref, g) } // delete root mount from list, handled here @@ -149,7 +151,7 @@ func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager, case opspb.MountType_BIND: // nothing to do here case opspb.MountType_CACHE: - mRef, err := mm.MountableCache(ctx, toProtoMount(m), ref) + mRef, err := mm.MountableCache(ctx, toProtoMount(m), ref, g) if err != nil { return nil, err } @@ -187,7 +189,7 @@ func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager, } execMount := executor.Mount{ - Src: mountable, + Src: mountWithSession(mountable, g), Selector: m.Selector, Dest: m.Dest, Readonly: m.Readonly, @@ -208,7 +210,7 @@ type gatewayContainer struct { id string netMode opspb.NetMode platform opspb.Platform - rootFS cache.Mountable + rootFS executor.Mountable mounts []executor.Mount executor executor.Executor started bool @@ -348,3 +350,16 @@ func addDefaultEnvvar(env []string, k, v string) []string { } return append(env, k+"="+v) } + +func mountWithSession(m cache.Mountable, g session.Group) executor.Mountable { + return &mountable{m: m, g: g} +} + +type mountable struct { + m cache.Mountable + g session.Group +} + +func (m *mountable) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { + return m.m.Mount(ctx, readonly, m.g) +} diff --git a/frontend/gateway/forwarder/forward.go b/frontend/gateway/forwarder/forward.go index 263f3400..368787f3 100644 --- a/frontend/gateway/forwarder/forward.go +++ b/frontend/gateway/forwarder/forward.go @@ -4,7 +4,6 @@ import ( "context" "sync" - "github.com/moby/buildkit/cache" cacheutil "github.com/moby/buildkit/cache/util" clienttypes "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" @@ -14,6 +13,7 @@ import ( gwpb "github.com/moby/buildkit/frontend/gateway/pb" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver" opspb "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/apicaps" @@ -61,7 +61,7 @@ func (c *bridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*cli cRes := &client.Result{} c.mu.Lock() for k, r := range res.Refs { - rr, err := newRef(r) + rr, err := newRef(r, session.NewGroup(c.sid)) if err != nil { return nil, err } @@ -69,7 +69,7 @@ func (c *bridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*cli cRes.AddRef(k, rr) } if r := res.Ref; r != nil { - rr, err := newRef(r) + rr, err := newRef(r, session.NewGroup(c.sid)) if err != nil { return nil, err } @@ -191,10 +191,11 @@ func (c *bridgeClient) NewContainer(ctx context.Context, req client.NewContainer type ref struct { solver.ResultProxy + session session.Group } -func newRef(r solver.ResultProxy) (*ref, error) { - return &ref{ResultProxy: r}, nil +func newRef(r solver.ResultProxy, s session.Group) (*ref, error) { + return &ref{ResultProxy: r, session: s}, nil } func (r *ref) ToState() (st llb.State, err error) { @@ -206,7 +207,7 @@ func (r *ref) ToState() (st llb.State, err error) { } func (r *ref) ReadFile(ctx context.Context, req client.ReadRequest) ([]byte, error) { - ref, err := r.getImmutableRef(ctx) + m, err := r.getMountable(ctx) if err != nil { return nil, err } @@ -219,11 +220,11 @@ func (r *ref) ReadFile(ctx context.Context, req client.ReadRequest) ([]byte, err Length: r.Length, } } - return cacheutil.ReadFile(ctx, ref, newReq) + return cacheutil.ReadFile(ctx, m, newReq) } func (r *ref) ReadDir(ctx context.Context, req client.ReadDirRequest) ([]*fstypes.Stat, error) { - ref, err := r.getImmutableRef(ctx) + m, err := r.getMountable(ctx) if err != nil { return nil, err } @@ -231,18 +232,18 @@ func (r *ref) ReadDir(ctx context.Context, req client.ReadDirRequest) ([]*fstype Path: req.Path, IncludePattern: req.IncludePattern, } - return cacheutil.ReadDir(ctx, ref, newReq) + return cacheutil.ReadDir(ctx, m, newReq) } func (r *ref) StatFile(ctx context.Context, req client.StatRequest) (*fstypes.Stat, error) { - ref, err := r.getImmutableRef(ctx) + m, err := r.getMountable(ctx) if err != nil { return nil, err } - return cacheutil.StatFile(ctx, ref, req.Path) + return cacheutil.StatFile(ctx, m, req.Path) } -func (r *ref) getImmutableRef(ctx context.Context) (cache.ImmutableRef, error) { +func (r *ref) getMountable(ctx context.Context) (snapshot.Mountable, error) { rr, err := r.ResultProxy.Result(ctx) if err != nil { return nil, err @@ -251,5 +252,5 @@ func (r *ref) getImmutableRef(ctx context.Context) (cache.ImmutableRef, error) { if !ok { return nil, errors.Errorf("invalid ref: %T", rr.Sys()) } - return ref.ImmutableRef, nil + return ref.ImmutableRef.Mount(ctx, true, r.session) } diff --git a/frontend/gateway/gateway.go b/frontend/gateway/gateway.go index 9dd27a93..55aae031 100644 --- a/frontend/gateway/gateway.go +++ b/frontend/gateway/gateway.go @@ -115,7 +115,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten return nil, errors.Errorf("invalid ref: %T", res.Sys()) } - rootFS, err = workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef) + rootFS, err = workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef, session.NewGroup(sid)) if err != nil { return nil, err } @@ -179,7 +179,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten if !ok { return nil, errors.Errorf("invalid ref: %T", r.Sys()) } - rootFS, err = workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef) + rootFS, err = workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef, session.NewGroup(sid)) if err != nil { return nil, err } @@ -247,7 +247,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten } defer lbf.Discard() - err = llbBridge.Run(ctx, "", rootFS, nil, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil) + err = llbBridge.Run(ctx, "", mountWithSession(rootFS, session.NewGroup(sid)), nil, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil) if err != nil { if errors.Is(err, context.Canceled) && lbf.isErrServerClosed { @@ -624,7 +624,12 @@ func (lbf *llbBridgeForwarder) ReadFile(ctx context.Context, req *pb.ReadFileReq } } - dt, err := cacheutil.ReadFile(ctx, workerRef.ImmutableRef, newReq) + m, err := workerRef.ImmutableRef.Mount(ctx, true, session.NewGroup(lbf.sid)) + if err != nil { + return nil, err + } + + dt, err := cacheutil.ReadFile(ctx, m, newReq) if err != nil { return nil, err } @@ -656,7 +661,11 @@ func (lbf *llbBridgeForwarder) ReadDir(ctx context.Context, req *pb.ReadDirReque Path: req.DirPath, IncludePattern: req.IncludePattern, } - entries, err := cacheutil.ReadDir(ctx, workerRef.ImmutableRef, newReq) + m, err := workerRef.ImmutableRef.Mount(ctx, true, session.NewGroup(lbf.sid)) + if err != nil { + return nil, err + } + entries, err := cacheutil.ReadDir(ctx, m, newReq) if err != nil { return nil, err } @@ -683,8 +692,11 @@ func (lbf *llbBridgeForwarder) StatFile(ctx context.Context, req *pb.StatFileReq if !ok { return nil, errors.Errorf("invalid ref: %T", r.Sys()) } - - st, err := cacheutil.StatFile(ctx, workerRef.ImmutableRef, req.Path) + m, err := workerRef.ImmutableRef.Mount(ctx, true, session.NewGroup(lbf.sid)) + if err != nil { + return nil, err + } + st, err := cacheutil.StatFile(ctx, m, req.Path) if err != nil { return nil, err } diff --git a/solver/cachestorage.go b/solver/cachestorage.go index cc1e2113..12797223 100644 --- a/solver/cachestorage.go +++ b/solver/cachestorage.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/moby/buildkit/session" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -46,6 +47,6 @@ type CacheInfoLink struct { type CacheResultStorage interface { Save(Result, time.Time) (CacheResult, error) Load(ctx context.Context, res CacheResult) (Result, error) - LoadRemote(ctx context.Context, res CacheResult) (*Remote, error) + LoadRemote(ctx context.Context, res CacheResult, s session.Group) (*Remote, error) Exists(id string) bool } diff --git a/solver/exporter.go b/solver/exporter.go index 6a073960..26ca2fb9 100644 --- a/solver/exporter.go +++ b/solver/exporter.go @@ -100,7 +100,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach return nil, err } - remote, err = cm.results.LoadRemote(ctx, res) + remote, err = cm.results.LoadRemote(ctx, res, opt.Session) if err != nil { return nil, err } diff --git a/solver/jobs.go b/solver/jobs.go index 84b30059..99e31b23 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -623,7 +623,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBased } s.slowMu.Unlock() ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan) - key, err := f(withAncestorCacheOpts(ctx, s.st), res) + key, err := f(withAncestorCacheOpts(ctx, s.st), res, s.st) complete := true if err != nil { select { diff --git a/solver/llbsolver/bridge.go b/solver/llbsolver/bridge.go index 1ec2b787..26050600 100644 --- a/solver/llbsolver/bridge.go +++ b/solver/llbsolver/bridge.go @@ -9,7 +9,6 @@ import ( "github.com/containerd/containerd/platforms" "github.com/mitchellh/hashstructure" - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/remotecache" "github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/executor" @@ -245,7 +244,7 @@ func (rp *resultProxy) Result(ctx context.Context) (res solver.CachedResult, err return nil, err } -func (b *llbBridge) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (b *llbBridge) Run(ctx context.Context, id string, root executor.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { w, err := b.resolveWorker() if err != nil { return err diff --git a/solver/llbsolver/file/refmanager.go b/solver/llbsolver/file/refmanager.go index faa4cdbf..a803d7f4 100644 --- a/solver/llbsolver/file/refmanager.go +++ b/solver/llbsolver/file/refmanager.go @@ -4,6 +4,7 @@ import ( "context" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver/llbsolver/ops/fileoptypes" "github.com/pkg/errors" @@ -17,25 +18,25 @@ type RefManager struct { cm cache.Manager } -func (rm *RefManager) Prepare(ctx context.Context, ref fileoptypes.Ref, readonly bool) (fileoptypes.Mount, error) { +func (rm *RefManager) Prepare(ctx context.Context, ref fileoptypes.Ref, readonly bool, g session.Group) (fileoptypes.Mount, error) { ir, ok := ref.(cache.ImmutableRef) if !ok && ref != nil { return nil, errors.Errorf("invalid ref type: %T", ref) } if ir != nil && readonly { - m, err := ir.Mount(ctx, readonly) + m, err := ir.Mount(ctx, readonly, g) if err != nil { return nil, err } return &Mount{m: m}, nil } - mr, err := rm.cm.New(ctx, ir, cache.WithDescription("fileop target"), cache.CachePolicyRetain) + mr, err := rm.cm.New(ctx, ir, g, cache.WithDescription("fileop target"), cache.CachePolicyRetain) if err != nil { return nil, err } - m, err := mr.Mount(ctx, readonly) + m, err := mr.Mount(ctx, readonly, g) if err != nil { return nil, err } diff --git a/solver/llbsolver/mounts/mount.go b/solver/llbsolver/mounts/mount.go index 9782bce4..f9e3db89 100644 --- a/solver/llbsolver/mounts/mount.go +++ b/solver/llbsolver/mounts/mount.go @@ -48,7 +48,7 @@ type MountManager struct { managerName string } -func (mm *MountManager) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, m *pb.Mount, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) { +func (mm *MountManager) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, m *pb.Mount, sharing pb.CacheSharingOpt, s session.Group) (mref cache.MutableRef, err error) { g := &cacheRefGetter{ locker: &mm.cacheMountsMu, cacheMounts: mm.cacheMounts, @@ -56,6 +56,7 @@ func (mm *MountManager) getRefCacheDir(ctx context.Context, ref cache.ImmutableR md: mm.md, globalCacheRefs: sharedCacheRefs, name: fmt.Sprintf("cached mount %s from %s", m.Dest, mm.managerName), + session: s, } return g.getRefCacheDir(ctx, ref, id, sharing) } @@ -67,6 +68,7 @@ type cacheRefGetter struct { md *metadata.Store globalCacheRefs *cacheRefs name string + session session.Group } func (g *cacheRefGetter) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) { @@ -105,7 +107,7 @@ func (g *cacheRefGetter) getRefCacheDir(ctx context.Context, ref cache.Immutable func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, ref cache.ImmutableRef, id string, block bool) (cache.MutableRef, error) { makeMutable := func(ref cache.ImmutableRef) (cache.MutableRef, error) { - return g.cm.New(ctx, ref, cache.WithRecordType(client.UsageRecordTypeCacheMount), cache.WithDescription(g.name), cache.CachePolicyRetain) + return g.cm.New(ctx, ref, g.session, cache.WithRecordType(client.UsageRecordTypeCacheMount), cache.WithDescription(g.name), cache.CachePolicyRetain) } cacheRefsLocker.Lock(key) @@ -187,7 +189,7 @@ type sshMount struct { idmap *idtools.IdentityMapping } -func (sm *sshMount) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { +func (sm *sshMount) Mount(ctx context.Context, readonly bool, g session.Group) (snapshot.Mountable, error) { return &sshMountInstance{sm: sm, idmap: sm.idmap}, nil } @@ -279,7 +281,7 @@ type secretMount struct { idmap *idtools.IdentityMapping } -func (sm *secretMount) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { +func (sm *secretMount) Mount(ctx context.Context, readonly bool, g session.Group) (snapshot.Mountable, error) { return &secretMountInstance{sm: sm, idmap: sm.idmap}, nil } @@ -370,11 +372,11 @@ func (sm *secretMountInstance) IdentityMapping() *idtools.IdentityMapping { return sm.idmap } -func (mm *MountManager) MountableCache(ctx context.Context, m *pb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) { +func (mm *MountManager) MountableCache(ctx context.Context, m *pb.Mount, ref cache.ImmutableRef, g session.Group) (cache.MutableRef, error) { if m.CacheOpt == nil { return nil, errors.Errorf("missing cache mount options") } - return mm.getRefCacheDir(ctx, ref, m.CacheOpt.ID, m, m.CacheOpt.Sharing) + return mm.getRefCacheDir(ctx, ref, m.CacheOpt.ID, m, m.CacheOpt.Sharing, g) } func (mm *MountManager) MountableTmpFS() cache.Mountable { @@ -397,7 +399,7 @@ type tmpfs struct { idmap *idtools.IdentityMapping } -func (f *tmpfs) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { +func (f *tmpfs) Mount(ctx context.Context, readonly bool, g session.Group) (snapshot.Mountable, error) { return &tmpfsMount{readonly: readonly, idmap: f.idmap}, nil } diff --git a/solver/llbsolver/ops/build.go b/solver/llbsolver/ops/build.go index d8b748de..45f0d41a 100644 --- a/solver/llbsolver/ops/build.go +++ b/solver/llbsolver/ops/build.go @@ -80,7 +80,7 @@ func (b *buildOp) Exec(ctx context.Context, g session.Group, inputs []solver.Res return nil, errors.Errorf("invalid reference for build %T", inp.Sys()) } - mount, err := ref.ImmutableRef.Mount(ctx, true) + mount, err := ref.ImmutableRef.Mount(ctx, true, g) if err != nil { return nil, err } diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index 5516376f..e6340bd2 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -16,6 +16,7 @@ import ( "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver/llbsolver" "github.com/moby/buildkit/solver/llbsolver/mounts" @@ -250,7 +251,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu makeMutable := func(ref cache.ImmutableRef) (cache.MutableRef, error) { desc := fmt.Sprintf("mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")) - return e.cm.New(ctx, ref, cache.WithDescription(desc)) + return e.cm.New(ctx, ref, g, cache.WithDescription(desc)) } switch m.MountType { @@ -280,7 +281,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu } case pb.MountType_CACHE: - mRef, err := e.mm.MountableCache(ctx, m, ref) + mRef, err := e.mm.MountableCache(ctx, m, ref, g) if err != nil { return nil, err } @@ -337,7 +338,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu root = active } } else { - mounts = append(mounts, executor.Mount{Src: mountable, Dest: m.Dest, Readonly: m.Readonly, Selector: m.Selector}) + mounts = append(mounts, executor.Mount{Src: mountWithSession(mountable, g), Dest: m.Dest, Readonly: m.Readonly, Selector: m.Selector}) } } @@ -390,7 +391,7 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu defer stdout.Close() defer stderr.Close() - if err := e.exec.Run(ctx, "", root, mounts, executor.ProcessInfo{Meta: meta, Stdin: nil, Stdout: stdout, Stderr: stderr}, nil); err != nil { + if err := e.exec.Run(ctx, "", mountWithSession(root, g), mounts, executor.ProcessInfo{Meta: meta, Stdin: nil, Stdout: stdout, Stderr: stderr}, nil); err != nil { return nil, errors.Wrapf(err, "executor failed running %v", meta.Args) } @@ -441,3 +442,16 @@ func parseExtraHosts(ips []*pb.HostIP) ([]executor.HostIP, error) { } return out, nil } + +func mountWithSession(m cache.Mountable, g session.Group) executor.Mountable { + return &mountable{m: m, g: g} +} + +type mountable struct { + m cache.Mountable + g session.Group +} + +func (m *mountable) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { + return m.m.Mount(ctx, readonly, m.g) +} diff --git a/solver/llbsolver/ops/file.go b/solver/llbsolver/ops/file.go index db10cfb0..f2f0f387 100644 --- a/solver/llbsolver/ops/file.go +++ b/solver/llbsolver/ops/file.go @@ -152,7 +152,7 @@ func (f *fileOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu inpRefs = append(inpRefs, workerRef.ImmutableRef) } - outs, err := f.solver.Solve(ctx, inpRefs, f.op.Actions) + outs, err := f.solver.Solve(ctx, inpRefs, f.op.Actions, g) if err != nil { return nil, err } @@ -279,7 +279,7 @@ type input struct { ref fileoptypes.Ref } -func (s *FileOpSolver) Solve(ctx context.Context, inputs []fileoptypes.Ref, actions []*pb.FileAction) ([]fileoptypes.Ref, error) { +func (s *FileOpSolver) Solve(ctx context.Context, inputs []fileoptypes.Ref, actions []*pb.FileAction, g session.Group) ([]fileoptypes.Ref, error) { for i, a := range actions { if int(a.Input) < -1 || int(a.Input) >= len(inputs)+len(actions) { return nil, errors.Errorf("invalid input index %d, %d provided", a.Input, len(inputs)+len(actions)) @@ -337,7 +337,7 @@ func (s *FileOpSolver) Solve(ctx context.Context, inputs []fileoptypes.Ref, acti if err := s.validate(idx, inputs, actions, nil); err != nil { return err } - inp, err := s.getInput(ctx, idx, inputs, actions) + inp, err := s.getInput(ctx, idx, inputs, actions, g) if err != nil { return err } @@ -378,7 +378,7 @@ func (s *FileOpSolver) validate(idx int, inputs []fileoptypes.Ref, actions []*pb return nil } -func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptypes.Ref, actions []*pb.FileAction) (input, error) { +func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptypes.Ref, actions []*pb.FileAction, g session.Group) (input, error) { inp, err := s.g.Do(ctx, fmt.Sprintf("inp-%d", idx), func(ctx context.Context) (_ interface{}, err error) { s.mu.Lock() inp := s.ins[idx] @@ -411,12 +411,12 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp loadInput := func(ctx context.Context) func() error { return func() error { - inp, err := s.getInput(ctx, int(action.Input), inputs, actions) + inp, err := s.getInput(ctx, int(action.Input), inputs, actions, g) if err != nil { return err } if inp.ref != nil { - m, err := s.r.Prepare(ctx, inp.ref, false) + m, err := s.r.Prepare(ctx, inp.ref, false, g) if err != nil { return err } @@ -431,12 +431,12 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp loadSecondaryInput := func(ctx context.Context) func() error { return func() error { - inp, err := s.getInput(ctx, int(action.SecondaryInput), inputs, actions) + inp, err := s.getInput(ctx, int(action.SecondaryInput), inputs, actions, g) if err != nil { return err } if inp.ref != nil { - m, err := s.r.Prepare(ctx, inp.ref, true) + m, err := s.r.Prepare(ctx, inp.ref, true, g) if err != nil { return err } @@ -459,12 +459,12 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp if u.ByName.Input < 0 { return nil, errors.Errorf("invalid user index: %d", u.ByName.Input) } - inp, err := s.getInput(ctx, int(u.ByName.Input), inputs, actions) + inp, err := s.getInput(ctx, int(u.ByName.Input), inputs, actions, g) if err != nil { return nil, err } if inp.ref != nil { - mm, err := s.r.Prepare(ctx, inp.ref, true) + mm, err := s.r.Prepare(ctx, inp.ref, true, g) if err != nil { return nil, err } @@ -515,7 +515,7 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp } if inpMount == nil { - m, err := s.r.Prepare(ctx, nil, false) + m, err := s.r.Prepare(ctx, nil, false, g) if err != nil { return nil, err } @@ -546,7 +546,7 @@ func (s *FileOpSolver) getInput(ctx context.Context, idx int, inputs []fileoptyp } case *pb.FileAction_Copy: if inpMountSecondary == nil { - m, err := s.r.Prepare(ctx, nil, true) + m, err := s.r.Prepare(ctx, nil, true, g) if err != nil { return nil, err } diff --git a/solver/llbsolver/ops/file_test.go b/solver/llbsolver/ops/file_test.go index e6a67840..6e8f2892 100644 --- a/solver/llbsolver/ops/file_test.go +++ b/solver/llbsolver/ops/file_test.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "testing" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver/llbsolver/ops/fileoptypes" "github.com/moby/buildkit/solver/pb" "github.com/pkg/errors" @@ -44,7 +45,7 @@ func TestMkdirMkfile(t *testing.T) { s, rb := newTestFileSolver() inp := rb.NewRef("ref1") - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp}, fo.Actions, nil) require.NoError(t, err) require.Equal(t, len(outs), 1) rb.checkReleased(t, append(outs, inp)) @@ -114,7 +115,7 @@ func TestChownOpt(t *testing.T) { s, rb := newTestFileSolver() inp := rb.NewRef("ref1") inp2 := rb.NewRef("usermount") - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp, inp2}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp, inp2}, fo.Actions, nil) require.NoError(t, err) require.Equal(t, len(outs), 1) rb.checkReleased(t, append(outs, inp, inp2)) @@ -176,7 +177,7 @@ func TestChownCopy(t *testing.T) { s, rb := newTestFileSolver() inpSrc := rb.NewRef("src") inpDest := rb.NewRef("dest") - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inpSrc, inpDest}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inpSrc, inpDest}, fo.Actions, nil) require.NoError(t, err) require.Equal(t, len(outs), 1) rb.checkReleased(t, append(outs, inpSrc, inpDest)) @@ -207,7 +208,7 @@ func TestInvalidNoOutput(t *testing.T) { } s, rb := newTestFileSolver() - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions, nil) rb.checkReleased(t, outs) require.Error(t, err) require.Contains(t, err.Error(), "no outputs specified") @@ -244,7 +245,7 @@ func TestInvalidDuplicateOutput(t *testing.T) { } s, rb := newTestFileSolver() - _, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions) + _, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions, nil) require.Error(t, err) require.Contains(t, err.Error(), "duplicate output") rb.checkReleased(t, nil) @@ -270,7 +271,7 @@ func TestActionInvalidIndex(t *testing.T) { } s, rb := newTestFileSolver() - _, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions) + _, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions, nil) require.Error(t, err) require.Contains(t, err.Error(), "loop from index") rb.checkReleased(t, nil) @@ -307,7 +308,7 @@ func TestActionLoop(t *testing.T) { } s, rb := newTestFileSolver() - _, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions) + _, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions, nil) require.Error(t, err) require.Contains(t, err.Error(), "loop from index") rb.checkReleased(t, nil) @@ -345,7 +346,7 @@ func TestMultiOutput(t *testing.T) { s, rb := newTestFileSolver() inp := rb.NewRef("ref1") - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp}, fo.Actions, nil) require.NoError(t, err) require.Equal(t, len(outs), 2) rb.checkReleased(t, append(outs, inp)) @@ -393,7 +394,7 @@ func TestFileFromScratch(t *testing.T) { } s, rb := newTestFileSolver() - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{}, fo.Actions, nil) require.NoError(t, err) require.Equal(t, len(outs), 1) rb.checkReleased(t, outs) @@ -427,7 +428,7 @@ func TestFileCopyInputSrc(t *testing.T) { s, rb := newTestFileSolver() inp0 := rb.NewRef("srcref") inp1 := rb.NewRef("destref") - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp0, inp1}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp0, inp1}, fo.Actions, nil) require.NoError(t, err) require.Equal(t, len(outs), 1) rb.checkReleased(t, append(outs, inp0, inp1)) @@ -481,7 +482,7 @@ func TestFileCopyInputRm(t *testing.T) { s, rb := newTestFileSolver() inp0 := rb.NewRef("srcref") inp1 := rb.NewRef("destref") - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp0, inp1}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp0, inp1}, fo.Actions, nil) require.NoError(t, err) require.Equal(t, len(outs), 1) rb.checkReleased(t, append(outs, inp0, inp1)) @@ -545,7 +546,7 @@ func TestFileParallelActions(t *testing.T) { <-ch } - outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp}, fo.Actions) + outs, err := s.Solve(context.TODO(), []fileoptypes.Ref{inp}, fo.Actions, nil) require.NoError(t, err) require.Equal(t, len(outs), 1) @@ -664,7 +665,7 @@ func (b *testFileRefBackend) NewRef(id string) *testFileRef { return r } -func (b *testFileRefBackend) Prepare(ctx context.Context, ref fileoptypes.Ref, readonly bool) (fileoptypes.Mount, error) { +func (b *testFileRefBackend) Prepare(ctx context.Context, ref fileoptypes.Ref, readonly bool, _ session.Group) (fileoptypes.Mount, error) { var active *testFileRef if ref == nil { active = b.NewRef("scratch") diff --git a/solver/llbsolver/ops/fileoptypes/types.go b/solver/llbsolver/ops/fileoptypes/types.go index 67aab026..ca7f3e2c 100644 --- a/solver/llbsolver/ops/fileoptypes/types.go +++ b/solver/llbsolver/ops/fileoptypes/types.go @@ -3,6 +3,7 @@ package fileoptypes import ( "context" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver/pb" ) @@ -23,6 +24,6 @@ type Backend interface { } type RefManager interface { - Prepare(ctx context.Context, ref Ref, readonly bool) (Mount, error) + Prepare(ctx context.Context, ref Ref, readonly bool, g session.Group) (Mount, error) Commit(ctx context.Context, mount Mount) (Ref, error) } diff --git a/solver/llbsolver/result.go b/solver/llbsolver/result.go index 0b793954..069b3025 100644 --- a/solver/llbsolver/result.go +++ b/solver/llbsolver/result.go @@ -6,6 +6,7 @@ import ( "path" "github.com/moby/buildkit/cache/contenthash" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/worker" @@ -21,7 +22,7 @@ type Selector struct { } func NewContentHashFunc(selectors []Selector) solver.ResultBasedCacheFunc { - return func(ctx context.Context, res solver.Result) (digest.Digest, error) { + return func(ctx context.Context, res solver.Result, s session.Group) (digest.Digest, error) { ref, ok := res.Sys().(*worker.WorkerRef) if !ok { return "", errors.Errorf("invalid reference: %T", res) @@ -39,13 +40,13 @@ func NewContentHashFunc(selectors []Selector) solver.ResultBasedCacheFunc { i, sel := i, sel eg.Go(func() error { if !sel.Wildcard { - dgst, err := contenthash.Checksum(ctx, ref.ImmutableRef, path.Join("/", sel.Path), sel.FollowLinks) + dgst, err := contenthash.Checksum(ctx, ref.ImmutableRef, path.Join("/", sel.Path), sel.FollowLinks, s) if err != nil { return err } dgsts[i] = []byte(dgst) } else { - dgst, err := contenthash.ChecksumWildcard(ctx, ref.ImmutableRef, path.Join("/", sel.Path), sel.FollowLinks) + dgst, err := contenthash.ChecksumWildcard(ctx, ref.ImmutableRef, path.Join("/", sel.Path), sel.FollowLinks, s) if err != nil { return err } @@ -63,11 +64,13 @@ func NewContentHashFunc(selectors []Selector) solver.ResultBasedCacheFunc { } } -func workerRefConverter(ctx context.Context, res solver.Result) (*solver.Remote, error) { - ref, ok := res.Sys().(*worker.WorkerRef) - if !ok { - return nil, errors.Errorf("invalid result: %T", res.Sys()) - } +func workerRefConverter(g session.Group) func(ctx context.Context, res solver.Result) (*solver.Remote, error) { + return func(ctx context.Context, res solver.Result) (*solver.Remote, error) { + ref, ok := res.Sys().(*worker.WorkerRef) + if !ok { + return nil, errors.Errorf("invalid result: %T", res.Sys()) + } - return ref.ImmutableRef.GetRemote(ctx, true, compression.Default) + return ref.ImmutableRef.GetRemote(ctx, true, compression.Default, g) + } } diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 562890dc..daa4c702 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -169,7 +169,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro } inp.Ref = workerRef.ImmutableRef - dt, err := inlineCache(ctx, exp.CacheExporter, r) + dt, err := inlineCache(ctx, exp.CacheExporter, r, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -193,7 +193,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro } m[k] = workerRef.ImmutableRef - dt, err := inlineCache(ctx, exp.CacheExporter, r) + dt, err := inlineCache(ctx, exp.CacheExporter, r, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -213,6 +213,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro } } + g := session.NewGroup(j.SessionID) var cacheExporterResponse map[string]string if e := exp.CacheExporter; e != nil { if err := inBuilderContext(ctx, j, "exporting cache", "", func(ctx context.Context, _ session.Group) error { @@ -224,8 +225,9 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro } // all keys have same export chain so exporting others is not needed _, err = r.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{ - Convert: workerRefConverter, + Convert: workerRefConverter(g), Mode: exp.CacheExportMode, + Session: g, }) return err }); err != nil { @@ -259,7 +261,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro }, nil } -func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedResult) ([]byte, error) { +func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedResult, g session.Group) ([]byte, error) { if efl, ok := e.(interface { ExportForLayers([]digest.Digest) ([]byte, error) }); ok { @@ -268,7 +270,7 @@ func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedR return nil, errors.Errorf("invalid reference: %T", res.Sys()) } - remote, err := workerRef.ImmutableRef.GetRemote(ctx, true, compression.Default) + remote, err := workerRef.ImmutableRef.GetRemote(ctx, true, compression.Default, g) if err != nil || remote == nil { return nil, nil } @@ -279,8 +281,9 @@ func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedR } if _, err := res.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{ - Convert: workerRefConverter, + Convert: workerRefConverter(g), Mode: solver.CacheExportModeMin, + Session: g, }); err != nil { return nil, err } diff --git a/solver/memorycachestorage.go b/solver/memorycachestorage.go index e0e58f0a..6754d489 100644 --- a/solver/memorycachestorage.go +++ b/solver/memorycachestorage.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/moby/buildkit/session" "github.com/pkg/errors" ) @@ -297,7 +298,7 @@ func (s *inMemoryResultStore) Load(ctx context.Context, res CacheResult) (Result return v.(Result), nil } -func (s *inMemoryResultStore) LoadRemote(ctx context.Context, res CacheResult) (*Remote, error) { +func (s *inMemoryResultStore) LoadRemote(_ context.Context, _ CacheResult, _ session.Group) (*Remote, error) { return nil, nil } diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 9d2193d2..84067e8c 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3697,7 +3697,7 @@ func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Res return cm.CacheManager.Load(ctx, rec) } -func digestFromResult(ctx context.Context, res Result) (digest.Digest, error) { +func digestFromResult(ctx context.Context, res Result, _ session.Group) (digest.Digest, error) { return digest.FromBytes([]byte(unwrap(res))), nil } diff --git a/solver/types.go b/solver/types.go index 26ba7b7d..892beb35 100644 --- a/solver/types.go +++ b/solver/types.go @@ -95,6 +95,8 @@ type CacheExportOpt struct { Convert func(context.Context, Result) (*Remote, error) // Mode defines a cache export algorithm Mode CacheExportMode + // Session is the session group to client (for auth credentials etc) + Session session.Group } // CacheExporter can export the artifacts of the build chain @@ -145,7 +147,7 @@ type Op interface { Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error) } -type ResultBasedCacheFunc func(context.Context, Result) (digest.Digest, error) +type ResultBasedCacheFunc func(context.Context, Result, session.Group) (digest.Digest, error) // CacheMap is a description for calculating the cache key of an operation. type CacheMap struct { diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index 0859c000..335be7be 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -203,7 +203,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach return nil, err } - if len(p.manifest.Remote.Descriptors) > 0 { + if len(p.manifest.Descriptors) > 0 { pw, _, _ := progress.FromContext(ctx) progressController := &controller.Controller{ Writer: pw, @@ -214,7 +214,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach } p.descHandlers = cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) - for i, desc := range p.manifest.Remote.Descriptors { + for i, desc := range p.manifest.Descriptors { // Hints for remote/stargz snapshotter for searching for remote snapshots labels := snapshots.FilterInheritedLabels(desc.Annotations) @@ -227,7 +227,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach layersKey = "containerd.io/snapshot/remote/stargz.layers" layers string ) - for _, l := range p.manifest.Remote.Descriptors[i:] { + for _, l := range p.manifest.Descriptors[i:] { ls := fmt.Sprintf("%s,", l.Digest.String()) // This avoids the label hits the size limitation. // Skipping layers is allowed here and only affects performance. @@ -239,7 +239,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach labels[layersKey] = strings.TrimSuffix(layers, ",") p.descHandlers[desc.Digest] = &cache.DescHandler{ - Provider: p.manifest.Remote.Provider, + Provider: p.manifest.Provider, Progress: progressController, SnapshotLabels: labels, } @@ -280,7 +280,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.ImmutableRef, err error) { p.Puller.Resolver = resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, g).WithImageStore(p.ImageStore, p.id.ResolveMode) - if len(p.manifest.Remote.Descriptors) == 0 { + if len(p.manifest.Descriptors) == 0 { return nil, nil } defer p.releaseTmpLeases(ctx) @@ -293,7 +293,7 @@ func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.Immuta }() var parent cache.ImmutableRef - for _, layerDesc := range p.manifest.Remote.Descriptors { + for _, layerDesc := range p.manifest.Descriptors { parent = current current, err = p.CacheAccessor.GetByBlob(ctx, layerDesc, parent, p.descHandlers, cache.WithImageRef(p.manifest.Ref)) diff --git a/source/git/gitsource.go b/source/git/gitsource.go index 8b25afd8..0197c3cd 100644 --- a/source/git/gitsource.go +++ b/source/git/gitsource.go @@ -64,7 +64,7 @@ func (gs *gitSource) ID() string { } // needs to be called with repo lock -func (gs *gitSource) mountRemote(ctx context.Context, remote string, auth []string) (target string, release func(), retErr error) { +func (gs *gitSource) mountRemote(ctx context.Context, remote string, auth []string, g session.Group) (target string, release func(), retErr error) { remoteKey := "git-remote::" + remote sis, err := gs.md.Search(remoteKey) @@ -88,7 +88,7 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string, auth []stri initializeRepo := false if remoteRef == nil { - remoteRef, err = gs.cache.New(ctx, nil, cache.CachePolicyRetain, cache.WithDescription(fmt.Sprintf("shared git repo for %s", remote))) + remoteRef, err = gs.cache.New(ctx, nil, g, cache.CachePolicyRetain, cache.WithDescription(fmt.Sprintf("shared git repo for %s", remote))) if err != nil { return "", nil, errors.Wrapf(err, "failed to create new mutable for %s", remote) } @@ -105,7 +105,7 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string, auth []stri } }() - mount, err := remoteRef.Mount(ctx, false) + mount, err := remoteRef.Mount(ctx, false, g) if err != nil { return "", nil, err } @@ -249,7 +249,7 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context, g session.Group, index gs.getAuthToken(ctx, g) - gitDir, unmountGitDir, err := gs.mountRemote(ctx, remote, gs.auth) + gitDir, unmountGitDir, err := gs.mountRemote(ctx, remote, gs.auth, g) if err != nil { return "", nil, false, err } @@ -307,7 +307,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out gs.locker.Lock(gs.src.Remote) defer gs.locker.Unlock(gs.src.Remote) - gitDir, unmountGitDir, err := gs.mountRemote(ctx, gs.src.Remote, gs.auth) + gitDir, unmountGitDir, err := gs.mountRemote(ctx, gs.src.Remote, gs.auth, g) if err != nil { return nil, err } @@ -345,7 +345,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out } } - checkoutRef, err := gs.cache.New(ctx, nil, cache.WithRecordType(client.UsageRecordTypeGitCheckout), cache.WithDescription(fmt.Sprintf("git snapshot for %s#%s", gs.src.Remote, ref))) + checkoutRef, err := gs.cache.New(ctx, nil, g, cache.WithRecordType(client.UsageRecordTypeGitCheckout), cache.WithDescription(fmt.Sprintf("git snapshot for %s#%s", gs.src.Remote, ref))) if err != nil { return nil, errors.Wrapf(err, "failed to create new mutable for %s", gs.src.Remote) } @@ -356,7 +356,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out } }() - mount, err := checkoutRef.Mount(ctx, false) + mount, err := checkoutRef.Mount(ctx, false, g) if err != nil { return nil, err } diff --git a/source/git/gitsource_test.go b/source/git/gitsource_test.go index 1bcbb326..635d0575 100644 --- a/source/git/gitsource_test.go +++ b/source/git/gitsource_test.go @@ -75,7 +75,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) { require.NoError(t, err) defer ref1.Release(context.TODO()) - mount, err := ref1.Mount(ctx, false) + mount, err := ref1.Mount(ctx, false, nil) require.NoError(t, err) lm := snapshot.LocalMounter(mount) @@ -126,7 +126,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) { require.NoError(t, err) defer ref3.Release(context.TODO()) - mount, err = ref3.Mount(ctx, false) + mount, err = ref3.Mount(ctx, false, nil) require.NoError(t, err) lm = snapshot.LocalMounter(mount) @@ -202,7 +202,7 @@ func testFetchBySHA(t *testing.T, keepGitDir bool) { require.NoError(t, err) defer ref1.Release(context.TODO()) - mount, err := ref1.Mount(ctx, false) + mount, err := ref1.Mount(ctx, false, nil) require.NoError(t, err) lm := snapshot.LocalMounter(mount) @@ -292,7 +292,7 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) { require.NoError(t, err) defer ref1.Release(context.TODO()) - mount, err := ref1.Mount(ctx, false) + mount, err := ref1.Mount(ctx, false, nil) require.NoError(t, err) lm := snapshot.LocalMounter(mount) @@ -304,7 +304,7 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) { require.NoError(t, err) defer ref2.Release(context.TODO()) - mount, err = ref2.Mount(ctx, false) + mount, err = ref2.Mount(ctx, false, nil) require.NoError(t, err) lm = snapshot.LocalMounter(mount) diff --git a/source/http/httpsource.go b/source/http/httpsource.go index 03c07ea1..dd9c1276 100644 --- a/source/http/httpsource.go +++ b/source/http/httpsource.go @@ -238,7 +238,7 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, modTime).String(), nil, true, nil } - ref, dgst, err := hs.save(ctx, resp) + ref, dgst, err := hs.save(ctx, resp, g) if err != nil { return "", nil, false, err } @@ -249,8 +249,8 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, resp.Header.Get("Last-Modified")).String(), nil, true, nil } -func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response) (ref cache.ImmutableRef, dgst digest.Digest, retErr error) { - newRef, err := hs.cache.New(ctx, nil, cache.CachePolicyRetain, cache.WithDescription(fmt.Sprintf("http url %s", hs.src.URL))) +func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response, s session.Group) (ref cache.ImmutableRef, dgst digest.Digest, retErr error) { + newRef, err := hs.cache.New(ctx, nil, s, cache.CachePolicyRetain, cache.WithDescription(fmt.Sprintf("http url %s", hs.src.URL))) if err != nil { return nil, "", err } @@ -265,7 +265,7 @@ func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response) (ref } }() - mount, err := newRef.Mount(ctx, false) + mount, err := newRef.Mount(ctx, false, s) if err != nil { return nil, "", err } @@ -392,7 +392,7 @@ func (hs *httpSourceHandler) Snapshot(ctx context.Context, g session.Group) (cac return nil, err } - ref, dgst, err := hs.save(ctx, resp) + ref, dgst, err := hs.save(ctx, resp, g) if err != nil { return nil, err } diff --git a/source/http/httpsource_test.go b/source/http/httpsource_test.go index 7d41d8e3..50492485 100644 --- a/source/http/httpsource_test.go +++ b/source/http/httpsource_test.go @@ -301,7 +301,7 @@ func TestHTTPChecksum(t *testing.T) { } func readFile(ctx context.Context, ref cache.ImmutableRef, fp string) ([]byte, error) { - mount, err := ref.Mount(ctx, false) + mount, err := ref.Mount(ctx, false, nil) if err != nil { return nil, err } diff --git a/source/local/local.go b/source/local/local.go index 0019a196..3d385ab1 100644 --- a/source/local/local.go +++ b/source/local/local.go @@ -96,7 +96,7 @@ func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, ind func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) { var ref cache.ImmutableRef err := ls.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error { - r, err := ls.snapshot(ctx, c) + r, err := ls.snapshot(ctx, g, c) if err != nil { return err } @@ -109,7 +109,7 @@ func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (ca return ref, nil } -func (ls *localSourceHandler) snapshot(ctx context.Context, caller session.Caller) (out cache.ImmutableRef, retErr error) { +func (ls *localSourceHandler) snapshot(ctx context.Context, s session.Group, caller session.Caller) (out cache.ImmutableRef, retErr error) { sharedKey := keySharedKey + ":" + ls.src.Name + ":" + ls.src.SharedKeyHint + ":" + caller.SharedKey() // TODO: replace caller.SharedKey() with source based hint from client(absolute-path+nodeid) var mutable cache.MutableRef @@ -126,7 +126,7 @@ func (ls *localSourceHandler) snapshot(ctx context.Context, caller session.Calle } if mutable == nil { - m, err := ls.cm.New(ctx, nil, cache.CachePolicyRetain, cache.WithRecordType(client.UsageRecordTypeLocalSource), cache.WithDescription(fmt.Sprintf("local source for %s", ls.src.Name))) + m, err := ls.cm.New(ctx, nil, s, cache.CachePolicyRetain, cache.WithRecordType(client.UsageRecordTypeLocalSource), cache.WithDescription(fmt.Sprintf("local source for %s", ls.src.Name))) if err != nil { return nil, err } @@ -146,7 +146,7 @@ func (ls *localSourceHandler) snapshot(ctx context.Context, caller session.Calle } }() - mount, err := mutable.Mount(ctx, false) + mount, err := mutable.Mount(ctx, false, s) if err != nil { return nil, err } diff --git a/util/pull/pull.go b/util/pull/pull.go index 26832d49..c968ba0c 100644 --- a/util/pull/pull.go +++ b/util/pull/pull.go @@ -11,11 +11,12 @@ import ( "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/remotes/docker/schema1" - "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/util/contentutil" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/imageutil" "github.com/moby/buildkit/util/pull/pullprogress" + "github.com/moby/buildkit/util/resolver" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -23,7 +24,7 @@ import ( type Puller struct { ContentStore content.Store - Resolver remotes.Resolver + Resolver *resolver.Resolver Src reference.Spec Platform ocispec.Platform @@ -37,17 +38,18 @@ type Puller struct { nonlayers []ocispec.Descriptor } -var _ content.Provider = &Puller{} +var _ content.Provider = &provider{} type PulledManifests struct { Ref string MainManifestDesc ocispec.Descriptor ConfigDesc ocispec.Descriptor Nonlayers []ocispec.Descriptor - Remote *solver.Remote + Descriptors []ocispec.Descriptor + Provider func(session.Group) content.Provider } -func (p *Puller) resolve(ctx context.Context) error { +func (p *Puller) resolve(ctx context.Context, resolver remotes.Resolver) error { _, err := p.g.Do(ctx, "", func(ctx context.Context) (_ interface{}, err error) { if p.resolveErr != nil || p.resolveDone { return nil, p.resolveErr @@ -60,7 +62,7 @@ func (p *Puller) resolve(ctx context.Context) error { if p.tryLocalResolve(ctx) == nil { return } - ref, desc, err := p.Resolver.Resolve(ctx, p.Src.String()) + ref, desc, err := resolver.Resolve(ctx, p.Src.String()) if err != nil { return nil, err } @@ -101,7 +103,7 @@ func (p *Puller) tryLocalResolve(ctx context.Context) error { } func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) { - err := p.resolve(ctx) + err := p.resolve(ctx, p.Resolver) if err != nil { return nil, err } @@ -189,20 +191,25 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) { MainManifestDesc: p.desc, ConfigDesc: p.configDesc, Nonlayers: p.nonlayers, - Remote: &solver.Remote{ - Descriptors: p.layers, - Provider: p, + Descriptors: p.layers, + Provider: func(g session.Group) content.Provider { + return &provider{puller: p, resolver: p.Resolver.WithSession(g)} }, }, nil } -func (p *Puller) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { - err := p.resolve(ctx) +type provider struct { + puller *Puller + resolver remotes.Resolver +} + +func (p *provider) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { + err := p.puller.resolve(ctx, p.resolver) if err != nil { return nil, err } - fetcher, err := p.Resolver.Fetcher(ctx, p.ref) + fetcher, err := p.resolver.Fetcher(ctx, p.puller.ref) if err != nil { return nil, err } diff --git a/worker/base/worker.go b/worker/base/worker.go index 053ac652..de6f1344 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -367,7 +367,7 @@ func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cache.ImmutableRef, err error) { pw, _, _ := progress.FromContext(ctx) descHandler := &cache.DescHandler{ - Provider: remote.Provider, + Provider: func(session.Group) content.Provider { return remote.Provider }, Progress: &controller.Controller{Writer: pw}, } descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) diff --git a/worker/cacheresult.go b/worker/cacheresult.go index 4035a559..844d153b 100644 --- a/worker/cacheresult.go +++ b/worker/cacheresult.go @@ -6,6 +6,7 @@ import ( "time" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/compression" "github.com/pkg/errors" @@ -67,7 +68,7 @@ func (s *cacheResultStorage) load(ctx context.Context, id string, hidden bool) ( return NewWorkerRefResult(ref, w), nil } -func (s *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult) (*solver.Remote, error) { +func (s *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult, g session.Group) (*solver.Remote, error) { w, refID, err := s.getWorkerRef(res.ID) if err != nil { return nil, err @@ -77,7 +78,7 @@ func (s *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheRes return nil, err } defer ref.Release(context.TODO()) - remote, err := ref.GetRemote(ctx, false, compression.Default) + remote, err := ref.GetRemote(ctx, false, compression.Default, g) if err != nil { return nil, nil // ignore error. loadRemote is best effort } diff --git a/worker/runc/runc_test.go b/worker/runc/runc_test.go index 573a57aa..2dccca5b 100644 --- a/worker/runc/runc_test.go +++ b/worker/runc/runc_test.go @@ -4,6 +4,7 @@ package runc import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -14,6 +15,7 @@ import ( ctdsnapshot "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/overlay" + "github.com/moby/buildkit/cache" "github.com/moby/buildkit/client" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/executor/oci" @@ -69,7 +71,7 @@ func TestRuncWorker(t *testing.T) { require.NoError(t, err) snap := tests.NewBusyboxSourceSnapshot(ctx, t, w, sm) - mounts, err := snap.Mount(ctx, false) + mounts, err := snap.Mount(ctx, false, nil) require.NoError(t, err) lm := snapshot.LocalMounter(mounts) @@ -107,16 +109,16 @@ func TestRuncWorker(t *testing.T) { } stderr := bytes.NewBuffer(nil) - err = w.WorkerOpt.Executor.Run(ctx, "", snap, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) + err = w.WorkerOpt.Executor.Run(ctx, "", execMount(snap), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.Error(t, err) // Read-only root // typical error is like `mkdir /.../rootfs/proc: read-only file system`. // make sure the error is caused before running `echo foo > /bar`. require.Contains(t, stderr.String(), "read-only file system") - root, err := w.CacheMgr.New(ctx, snap) + root, err := w.CacheMgr.New(ctx, snap, nil) require.NoError(t, err) - err = w.WorkerOpt.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) + err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err) meta = executor.Meta{ @@ -124,13 +126,13 @@ func TestRuncWorker(t *testing.T) { Cwd: "/", } - err = w.WorkerOpt.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) + err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root), nil, executor.ProcessInfo{Meta: meta, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err) rf, err := root.Commit(ctx) require.NoError(t, err) - mounts, err = rf.Mount(ctx, false) + mounts, err = rf.Mount(ctx, false, nil) require.NoError(t, err) lm = snapshot.LocalMounter(mounts) @@ -172,7 +174,7 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) { sm, err := session.NewManager() require.NoError(t, err) snap := tests.NewBusyboxSourceSnapshot(ctx, t, w, sm) - root, err := w.CacheMgr.New(ctx, snap) + root, err := w.CacheMgr.New(ctx, snap, nil) require.NoError(t, err) // ensure the procfs is shared @@ -185,7 +187,7 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) { } stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) - err = w.WorkerOpt.Executor.Run(ctx, "", root, nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}, nil) + err = w.WorkerOpt.Executor.Run(ctx, "", execMount(root), nil, executor.ProcessInfo{Meta: meta, Stdout: &nopCloser{stdout}, Stderr: &nopCloser{stderr}}, nil) require.NoError(t, err, fmt.Sprintf("stdout=%q, stderr=%q", stdout.String(), stderr.String())) require.Equal(t, string(selfCmdline), stdout.String()) } @@ -221,3 +223,15 @@ type nopCloser struct { func (n *nopCloser) Close() error { return nil } + +func execMount(m cache.Mountable) executor.Mountable { + return &mountable{m: m} +} + +type mountable struct { + m cache.Mountable +} + +func (m *mountable) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { + return m.m.Mount(ctx, readonly, nil) +} diff --git a/worker/tests/common.go b/worker/tests/common.go index 31a5f36e..257e5bf4 100644 --- a/worker/tests/common.go +++ b/worker/tests/common.go @@ -13,6 +13,7 @@ import ( "github.com/moby/buildkit/executor" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/source" "github.com/moby/buildkit/worker/base" "github.com/pkg/errors" @@ -43,7 +44,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { require.NoError(t, err) snap := NewBusyboxSourceSnapshot(ctx, t, w, sm) - root, err := w.CacheMgr.New(ctx, snap) + root, err := w.CacheMgr.New(ctx, snap, nil) require.NoError(t, err) id := identity.NewID() @@ -63,7 +64,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { }() stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) - err = w.WorkerOpt.Executor.Run(ctxTimeout, id, root, nil, executor.ProcessInfo{ + err = w.WorkerOpt.Executor.Run(ctxTimeout, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"cat"}, Cwd: "/", @@ -84,7 +85,7 @@ func TestWorkerExec(t *testing.T, w *base.Worker) { eg := errgroup.Group{} started = make(chan struct{}) eg.Go(func() error { - return w.WorkerOpt.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ + return w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"sleep", "10"}, Cwd: "/", @@ -166,7 +167,7 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { require.NoError(t, err) snap := NewBusyboxSourceSnapshot(ctx, t, w, sm) - root, err := w.CacheMgr.New(ctx, snap) + root, err := w.CacheMgr.New(ctx, snap, nil) require.NoError(t, err) id := identity.NewID() @@ -175,7 +176,7 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { eg := errgroup.Group{} started := make(chan struct{}) eg.Go(func() error { - return w.WorkerOpt.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ + return w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"/bin/false"}, Cwd: "/", @@ -204,7 +205,7 @@ func TestWorkerExecFailures(t *testing.T, w *base.Worker) { eg = errgroup.Group{} started = make(chan struct{}) eg.Go(func() error { - return w.WorkerOpt.Executor.Run(ctx, id, root, nil, executor.ProcessInfo{ + return w.WorkerOpt.Executor.Run(ctx, id, execMount(root), nil, executor.ProcessInfo{ Meta: executor.Meta{ Args: []string{"bogus"}, }, @@ -239,3 +240,15 @@ type nopCloser struct { func (n *nopCloser) Close() error { return nil } + +func execMount(m cache.Mountable) executor.Mountable { + return &mountable{m: m} +} + +type mountable struct { + m cache.Mountable +} + +func (m *mountable) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { + return m.m.Mount(ctx, readonly, nil) +} From bdcee17437cc90ed9c5928e3e9fbde7134399727 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Sun, 1 Nov 2020 23:55:46 -0800 Subject: [PATCH 2/2] executor: change mount to struct Allows readonly passed cleanly. Signed-off-by: Tonis Tiigi --- executor/containerdexecutor/executor.go | 4 ++-- executor/executor.go | 2 +- executor/runcexecutor/executor.go | 7 +++---- frontend/gateway/container.go | 16 ++++++++++++---- solver/llbsolver/bridge.go | 2 +- solver/llbsolver/ops/exec.go | 14 +++++++++++--- worker/runc/runc_test.go | 4 ++-- worker/tests/common.go | 4 ++-- 8 files changed, 34 insertions(+), 19 deletions(-) diff --git a/executor/containerdexecutor/executor.go b/executor/containerdexecutor/executor.go index 1e543480..f4ab6d37 100644 --- a/executor/containerdexecutor/executor.go +++ b/executor/containerdexecutor/executor.go @@ -55,7 +55,7 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb } } -func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { if id == "" { id = identity.NewID() } @@ -93,7 +93,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M defer clean() } - mountable, err := root.Mount(ctx, false) + mountable, err := root.Src.Mount(ctx, false) if err != nil { return err } diff --git a/executor/executor.go b/executor/executor.go index 7b047310..8fbe4a92 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -49,7 +49,7 @@ type Executor interface { // Run will start a container for the given process with rootfs, mounts. // `id` is an optional name for the container so it can be referenced later via Exec. // `started` is an optional channel that will be closed when the container setup completes and has started running. - Run(ctx context.Context, id string, rootfs Mountable, mounts []Mount, process ProcessInfo, started chan<- struct{}) error + Run(ctx context.Context, id string, rootfs Mount, mounts []Mount, process ProcessInfo, started chan<- struct{}) error // Exec will start a process in container matching `id`. An error will be returned // if the container failed to start (via Run) or has exited before Exec is called. Exec(ctx context.Context, id string, process ProcessInfo) error diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 7c9a4883..035edfd2 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -18,7 +18,6 @@ import ( "github.com/containerd/continuity/fs" runc "github.com/containerd/go-runc" "github.com/docker/docker/pkg/idtools" - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/executor/oci" "github.com/moby/buildkit/identity" @@ -130,7 +129,7 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex return w, nil } -func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { meta := process.Meta startedOnce := sync.Once{} @@ -178,7 +177,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mountab defer clean() } - mountable, err := root.Mount(ctx, false) + mountable, err := root.Src.Mount(ctx, false) if err != nil { return err } @@ -262,7 +261,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mountab defer cleanup() spec.Root.Path = rootFSPath - if _, ok := root.(cache.ImmutableRef); ok { // TODO: pass in with mount, not ref type + if root.Readonly { spec.Root.Readonly = true } diff --git a/frontend/gateway/container.go b/frontend/gateway/container.go index 116a1b28..0aad15ef 100644 --- a/frontend/gateway/container.go +++ b/frontend/gateway/container.go @@ -121,7 +121,7 @@ func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager, } } - if ctr.rootFS == nil { + if ctr.rootFS.Src == nil { return nil, errors.Errorf("root mount required") } @@ -189,7 +189,7 @@ func NewContainer(ctx context.Context, e executor.Executor, sm *session.Manager, } execMount := executor.Mount{ - Src: mountWithSession(mountable, g), + Src: mountableWithSession(mountable, g), Selector: m.Selector, Dest: m.Dest, Readonly: m.Readonly, @@ -210,7 +210,7 @@ type gatewayContainer struct { id string netMode opspb.NetMode platform opspb.Platform - rootFS executor.Mountable + rootFS executor.Mount mounts []executor.Mount executor executor.Executor started bool @@ -351,7 +351,15 @@ func addDefaultEnvvar(env []string, k, v string) []string { return append(env, k+"="+v) } -func mountWithSession(m cache.Mountable, g session.Group) executor.Mountable { +func mountWithSession(m cache.Mountable, g session.Group) executor.Mount { + _, readonly := m.(cache.ImmutableRef) + return executor.Mount{ + Src: mountableWithSession(m, g), + Readonly: readonly, + } +} + +func mountableWithSession(m cache.Mountable, g session.Group) executor.Mountable { return &mountable{m: m, g: g} } diff --git a/solver/llbsolver/bridge.go b/solver/llbsolver/bridge.go index 26050600..df3e4b74 100644 --- a/solver/llbsolver/bridge.go +++ b/solver/llbsolver/bridge.go @@ -244,7 +244,7 @@ func (rp *resultProxy) Result(ctx context.Context) (res solver.CachedResult, err return nil, err } -func (b *llbBridge) Run(ctx context.Context, id string, root executor.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { +func (b *llbBridge) Run(ctx context.Context, id string, root executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) { w, err := b.resolveWorker() if err != nil { return err diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index e6340bd2..5938df7a 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -338,7 +338,11 @@ func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu root = active } } else { - mounts = append(mounts, executor.Mount{Src: mountWithSession(mountable, g), Dest: m.Dest, Readonly: m.Readonly, Selector: m.Selector}) + mws := mountWithSession(mountable, g) + mws.Dest = m.Dest + mws.Readonly = m.Readonly + mws.Selector = m.Selector + mounts = append(mounts, mws) } } @@ -443,8 +447,12 @@ func parseExtraHosts(ips []*pb.HostIP) ([]executor.HostIP, error) { return out, nil } -func mountWithSession(m cache.Mountable, g session.Group) executor.Mountable { - return &mountable{m: m, g: g} +func mountWithSession(m cache.Mountable, g session.Group) executor.Mount { + _, readonly := m.(cache.ImmutableRef) + return executor.Mount{ + Src: &mountable{m: m, g: g}, + Readonly: readonly, + } } type mountable struct { diff --git a/worker/runc/runc_test.go b/worker/runc/runc_test.go index 2dccca5b..11174259 100644 --- a/worker/runc/runc_test.go +++ b/worker/runc/runc_test.go @@ -224,8 +224,8 @@ func (n *nopCloser) Close() error { return nil } -func execMount(m cache.Mountable) executor.Mountable { - return &mountable{m: m} +func execMount(m cache.Mountable) executor.Mount { + return executor.Mount{Src: &mountable{m: m}} } type mountable struct { diff --git a/worker/tests/common.go b/worker/tests/common.go index 257e5bf4..6160523d 100644 --- a/worker/tests/common.go +++ b/worker/tests/common.go @@ -241,8 +241,8 @@ func (n *nopCloser) Close() error { return nil } -func execMount(m cache.Mountable) executor.Mountable { - return &mountable{m: m} +func execMount(m cache.Mountable) executor.Mount { + return executor.Mount{Src: &mountable{m: m}} } type mountable struct {