report imported cache lookup progress
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
914ea2110b
commit
65dc07eb28
|
@ -2,7 +2,9 @@ package cacheimport
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/remotes"
|
||||
|
@ -10,7 +12,10 @@ import (
|
|||
"github.com/containerd/containerd/rootfs"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/blobs"
|
||||
"github.com/moby/buildkit/client"
|
||||
buildkitidentity "github.com/moby/buildkit/identity"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
"github.com/moby/buildkit/util/progress"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
|
@ -124,6 +129,7 @@ func (ci *CacheImporter) Import(ctx context.Context, ref string) (InstructionCac
|
|||
allBlobs: allBlobs,
|
||||
allDesc: allDesc,
|
||||
fetcher: fetcher,
|
||||
ref: ref,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -134,6 +140,7 @@ type importInfo struct {
|
|||
byContentKey map[digest.Digest][]digest.Digest
|
||||
allDesc map[digest.Digest]ocispec.Descriptor
|
||||
allBlobs map[digest.Digest]configItem
|
||||
ref string
|
||||
}
|
||||
|
||||
func (ii *importInfo) Probe(ctx context.Context, key digest.Digest) (bool, error) {
|
||||
|
@ -159,16 +166,28 @@ func (ii *importInfo) getChain(dgst digest.Digest) ([]blobs.DiffPair, error) {
|
|||
return append(out, blobs.DiffPair{Blobsum: dgst, DiffID: cfg.DiffID}), nil
|
||||
}
|
||||
|
||||
func (ii *importInfo) Lookup(ctx context.Context, key digest.Digest) (interface{}, error) {
|
||||
func (ii *importInfo) Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) {
|
||||
desc, ok := ii.byCacheKey[key]
|
||||
if !ok || desc.Blobsum == "" {
|
||||
return nil, nil
|
||||
}
|
||||
var out interface{}
|
||||
if err := inVertexContext(ctx, fmt.Sprintf("cache from %s for %s", ii.ref, msg), func(ctx context.Context) error {
|
||||
|
||||
ch, err := ii.getChain(desc.Blobsum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res, err := ii.fetch(ctx, ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out = res
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ii.fetch(ctx, ch)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (ii *importInfo) Set(key digest.Digest, ref interface{}) error {
|
||||
|
@ -269,8 +288,45 @@ func (ii *importInfo) getLayers(ctx context.Context, dpairs []blobs.DiffPair) ([
|
|||
|
||||
type InstructionCache interface {
|
||||
Probe(ctx context.Context, key digest.Digest) (bool, error)
|
||||
Lookup(ctx context.Context, key digest.Digest) (interface{}, error) // TODO: regular ref
|
||||
Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) // TODO: regular ref
|
||||
Set(key digest.Digest, ref interface{}) error
|
||||
SetContentMapping(contentKey, key digest.Digest) error
|
||||
GetContentMapping(dgst digest.Digest) ([]digest.Digest, error)
|
||||
}
|
||||
|
||||
func inVertexContext(ctx context.Context, name string, f func(ctx context.Context) error) error {
|
||||
v := client.Vertex{
|
||||
Digest: digest.FromBytes([]byte(buildkitidentity.NewID())),
|
||||
Name: name,
|
||||
}
|
||||
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
|
||||
notifyStarted(ctx, &v)
|
||||
defer pw.Close()
|
||||
err := f(ctx)
|
||||
notifyCompleted(ctx, &v, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func notifyStarted(ctx context.Context, v *client.Vertex) {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
v.Started = &now
|
||||
v.Completed = nil
|
||||
pw.Write(v.Digest.String(), *v)
|
||||
}
|
||||
|
||||
func notifyCompleted(ctx context.Context, v *client.Vertex, err error) {
|
||||
pw, _, _ := progress.FromContext(ctx)
|
||||
defer pw.Close()
|
||||
now := time.Now()
|
||||
if v.Started == nil {
|
||||
v.Started = &now
|
||||
}
|
||||
v.Completed = &now
|
||||
v.Cached = false
|
||||
if err != nil {
|
||||
v.Error = err.Error()
|
||||
}
|
||||
pw.Write(v.Digest.String(), *v)
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ func (ls *LocalStore) Probe(ctx context.Context, key digest.Digest) (bool, error
|
|||
return ls.MetadataStore.Probe(index(key.String()))
|
||||
}
|
||||
|
||||
func (ls *LocalStore) Lookup(ctx context.Context, key digest.Digest) (interface{}, error) {
|
||||
func (ls *LocalStore) Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) {
|
||||
snaps, err := ls.MetadataStore.Search(index(key.String()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -225,7 +225,7 @@ func getRef(s VertexSolver, ctx context.Context, v *vertex, index Index, cache I
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ref, err := cache.Lookup(ctx, k)
|
||||
ref, err := cache.Lookup(ctx, k, s.(*vertexSolver).v.Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -246,7 +246,7 @@ func getRef(s VertexSolver, ctx context.Context, v *vertex, index Index, cache I
|
|||
return nil, err
|
||||
}
|
||||
if r.CacheKey != "" {
|
||||
ref, err := cache.Lookup(ctx, r.CacheKey)
|
||||
ref, err := cache.Lookup(ctx, r.CacheKey, s.(*vertexSolver).v.Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ type Op interface {
|
|||
|
||||
type InstructionCache interface {
|
||||
Probe(ctx context.Context, key digest.Digest) (bool, error)
|
||||
Lookup(ctx context.Context, key digest.Digest) (interface{}, error) // TODO: regular ref
|
||||
Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) // TODO: regular ref
|
||||
Set(key digest.Digest, ref interface{}) error
|
||||
SetContentMapping(contentKey, key digest.Digest) error
|
||||
GetContentMapping(dgst digest.Digest) ([]digest.Digest, error)
|
||||
|
@ -518,7 +518,7 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
|
|||
|
||||
// check if current cache key is in cache
|
||||
if len(inp.cacheKeys) > 0 {
|
||||
ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1])
|
||||
ref, err := vs.cache.Lookup(ctx2, inp.cacheKeys[len(inp.cacheKeys)-1], inp.solver.(*vertexSolver).v.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -835,15 +835,15 @@ func (mc *mergedCache) Probe(ctx context.Context, key digest.Digest) (bool, erro
|
|||
return mc.remote.Probe(ctx, key)
|
||||
}
|
||||
|
||||
func (mc *mergedCache) Lookup(ctx context.Context, key digest.Digest) (interface{}, error) {
|
||||
func (mc *mergedCache) Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) {
|
||||
v, err := mc.local.Probe(ctx, key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if v {
|
||||
return mc.local.Lookup(ctx, key)
|
||||
return mc.local.Lookup(ctx, key, msg)
|
||||
}
|
||||
return mc.remote.Lookup(ctx, key)
|
||||
return mc.remote.Lookup(ctx, key, msg)
|
||||
}
|
||||
func (mc *mergedCache) Set(key digest.Digest, ref interface{}) error {
|
||||
return mc.local.Set(key, ref)
|
||||
|
|
Loading…
Reference in New Issue