Merge pull request #137 from tonistiigi/contentmask
solver: replace contentkey with contentmaskdocker-18.09
commit
0ee59e2bdf
|
@ -118,6 +118,6 @@ func (b *buildOp) Run(ctx context.Context, inputs []Reference) (outputs []Refere
|
|||
return []Reference{newref}, err
|
||||
}
|
||||
|
||||
func (b *buildOp) ContentKeys(context.Context, [][]digest.Digest, []Reference) ([]digest.Digest, error) {
|
||||
return nil, nil
|
||||
func (b *buildOp) ContentMask(context.Context) (digest.Digest, [][]string, error) {
|
||||
return "", nil, nil
|
||||
}
|
||||
|
|
118
solver/exec.go
118
solver/exec.go
|
@ -8,29 +8,29 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/contenthash"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/util/progress/logs"
|
||||
"github.com/moby/buildkit/worker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const execCacheType = "buildkit.exec.v0"
|
||||
|
||||
type execOp struct {
|
||||
op *pb.ExecOp
|
||||
cm cache.Manager
|
||||
w worker.Worker
|
||||
op *pb.ExecOp
|
||||
cm cache.Manager
|
||||
w worker.Worker
|
||||
numInputs int
|
||||
}
|
||||
|
||||
func newExecOp(_ Vertex, op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) {
|
||||
func newExecOp(v Vertex, op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) {
|
||||
return &execOp{
|
||||
op: op.Exec,
|
||||
cm: cm,
|
||||
w: w,
|
||||
op: op.Exec,
|
||||
cm: cm,
|
||||
w: w,
|
||||
numInputs: len(v.Inputs()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -130,92 +130,58 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro
|
|||
return refs, nil
|
||||
}
|
||||
|
||||
func (e *execOp) ContentKeys(ctx context.Context, inputs [][]digest.Digest, refs []Reference) ([]digest.Digest, error) {
|
||||
if len(refs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
func (e *execOp) ContentMask(ctx context.Context) (digest.Digest, [][]string, error) {
|
||||
// contentKey for exec uses content based checksum for mounts and definition
|
||||
// based checksum for root
|
||||
|
||||
skipped := make([]int, 0)
|
||||
skipped := make(map[int]struct{}, 0)
|
||||
|
||||
type src struct {
|
||||
index pb.InputIndex
|
||||
index int
|
||||
selector string
|
||||
}
|
||||
|
||||
skip := true
|
||||
srcsMap := make(map[src]struct{}, len(refs))
|
||||
for _, m := range e.op.Mounts {
|
||||
srcsMap := make(map[src]struct{})
|
||||
mountsCopy := make([]*pb.Mount, len(e.op.Mounts))
|
||||
for i, m := range e.op.Mounts {
|
||||
copy := *m
|
||||
mountsCopy[i] = ©
|
||||
if m.Input != pb.Empty {
|
||||
if m.Dest != pb.RootMount && m.Readonly { // could also include rw if they don't have a selector, but not sure if helps performance
|
||||
srcsMap[src{m.Input, path.Join("/", m.Selector)}] = struct{}{}
|
||||
skip = false
|
||||
srcsMap[src{int(m.Input), path.Join("/", m.Selector)}] = struct{}{}
|
||||
mountsCopy[i].Selector = ""
|
||||
} else {
|
||||
skipped = append(skipped, int(m.Input))
|
||||
skipped[int(m.Input)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
if skip {
|
||||
return nil, nil
|
||||
if len(srcsMap) == 0 {
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
srcs := make([]src, 0, len(srcsMap))
|
||||
for s := range srcsMap {
|
||||
srcs = append(srcs, s)
|
||||
contentInputs := make([][]string, e.numInputs)
|
||||
for in := range srcsMap {
|
||||
contentInputs[in.index] = append(contentInputs[in.index], in.selector)
|
||||
}
|
||||
// TODO: remove nested directories
|
||||
|
||||
for k := range contentInputs {
|
||||
sort.Strings(contentInputs[k])
|
||||
}
|
||||
|
||||
sort.Slice(srcs, func(i, j int) bool {
|
||||
if srcs[i].index == srcs[j].index {
|
||||
return srcs[i].selector < srcs[j].selector
|
||||
}
|
||||
return srcs[i].index < srcs[j].index
|
||||
ecopy := *e.op
|
||||
ecopy.Mounts = mountsCopy
|
||||
|
||||
dt, err := json.Marshal(struct {
|
||||
Type string
|
||||
Exec *pb.ExecOp
|
||||
}{
|
||||
Type: execCacheType,
|
||||
Exec: &ecopy,
|
||||
})
|
||||
|
||||
dgsts := make([]digest.Digest, len(srcs))
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for i, s := range srcs {
|
||||
func(i int, s src, ref Reference) {
|
||||
eg.Go(func() error {
|
||||
ref, ok := toImmutableRef(ref)
|
||||
if !ok {
|
||||
return errors.Errorf("invalid reference")
|
||||
}
|
||||
dgst, err := contenthash.Checksum(ctx, ref, s.selector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dgsts[i] = dgst
|
||||
return nil
|
||||
})
|
||||
}(i, s, refs[int(s.index)])
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return nil, err
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
var out []digest.Digest
|
||||
inputKeys := make([]digest.Digest, len(skipped))
|
||||
for _, cacheKeys := range inputs {
|
||||
for i := range inputKeys {
|
||||
inputKeys[i] = cacheKeys[skipped[i]]
|
||||
}
|
||||
dt, err := json.Marshal(struct {
|
||||
Type string
|
||||
Sources []digest.Digest
|
||||
Inputs []digest.Digest
|
||||
Exec *pb.ExecOp
|
||||
}{
|
||||
Type: execCacheType,
|
||||
Sources: dgsts,
|
||||
Inputs: inputKeys,
|
||||
Exec: e.op,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, digest.FromBytes(dt))
|
||||
}
|
||||
|
||||
return out, nil
|
||||
return digest.FromBytes(dt), contentInputs, nil
|
||||
}
|
||||
|
|
103
solver/solver.go
103
solver/solver.go
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/contenthash"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/exporter"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
|
@ -59,8 +60,13 @@ type Reference interface {
|
|||
|
||||
// Op is an implementation for running a vertex
|
||||
type Op interface {
|
||||
// CacheKey returns a persistent cache key for operation.
|
||||
CacheKey(context.Context) (digest.Digest, error)
|
||||
ContentKeys(context.Context, [][]digest.Digest, []Reference) ([]digest.Digest, error)
|
||||
// ContentMask returns a partial cache checksum with content paths to the
|
||||
// inputs. User can combine the content checksum of these paths to get a valid
|
||||
// content based cache key.
|
||||
ContentMask(context.Context) (digest.Digest, [][]string, error)
|
||||
// Run runs an operation and returns the output references.
|
||||
Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error)
|
||||
}
|
||||
|
||||
|
@ -473,26 +479,31 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
|
|||
lastInputKeys[i] = vs.inputs[i].cacheKeys[len(vs.inputs[i].cacheKeys)-1]
|
||||
}
|
||||
|
||||
// TODO: avoid doing this twice on cancellation+resume
|
||||
contentKeys, err := vs.op.ContentKeys(ctx, [][]digest.Digest{lastInputKeys}, inputRefs)
|
||||
dgst, inp, err := vs.op.ContentMask(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var extraKeys []digest.Digest
|
||||
for _, k := range contentKeys {
|
||||
cks, err := vs.cache.GetContentMapping(k)
|
||||
var contentKey digest.Digest
|
||||
if dgst != "" {
|
||||
contentKey, err = calculateContentHash(ctx, inputRefs, dgst, lastInputKeys, inp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var extraKeys []digest.Digest
|
||||
cks, err := vs.cache.GetContentMapping(contentKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
extraKeys = append(extraKeys, cks...)
|
||||
}
|
||||
if len(extraKeys) > 0 {
|
||||
vs.mu.Lock()
|
||||
vs.results = append(vs.results, extraKeys...)
|
||||
signal()
|
||||
waitRun = vs.signal.Reset()
|
||||
vs.mu.Unlock()
|
||||
if len(extraKeys) > 0 {
|
||||
vs.mu.Lock()
|
||||
vs.results = append(vs.results, extraKeys...)
|
||||
signal()
|
||||
waitRun = vs.signal.Reset()
|
||||
vs.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
|
@ -534,17 +545,73 @@ func (vs *vertexSolver) run(ctx context.Context, signal func()) (retErr error) {
|
|||
logrus.Errorf("failed to save cache for %s: %v", cacheKey, err)
|
||||
}
|
||||
}
|
||||
if len(contentKeys) > 0 {
|
||||
for _, ck := range contentKeys {
|
||||
if err := vs.cache.SetContentMapping(ck, cacheKey); err != nil {
|
||||
logrus.Errorf("failed to save content mapping: %v", err)
|
||||
}
|
||||
if contentKey != "" {
|
||||
if err := vs.cache.SetContentMapping(contentKey, cacheKey); err != nil {
|
||||
logrus.Errorf("failed to save content mapping: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getInputContentHash(ctx context.Context, ref cache.ImmutableRef, selectors []string) (digest.Digest, error) {
|
||||
out := make([]digest.Digest, 0, len(selectors))
|
||||
for _, s := range selectors {
|
||||
dgst, err := contenthash.Checksum(ctx, ref, s)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
out = append(out, dgst)
|
||||
}
|
||||
if len(out) == 1 {
|
||||
return out[0], nil
|
||||
}
|
||||
dt, err := json.Marshal(out)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return digest.FromBytes(dt), nil
|
||||
}
|
||||
|
||||
func calculateContentHash(ctx context.Context, refs []Reference, mainDigest digest.Digest, inputs []digest.Digest, contentMap [][]string) (digest.Digest, error) {
|
||||
dgsts := make([]digest.Digest, len(contentMap))
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for i, sel := range contentMap {
|
||||
if sel == nil {
|
||||
dgsts[i] = inputs[i]
|
||||
continue
|
||||
}
|
||||
func(i int) {
|
||||
eg.Go(func() error {
|
||||
ref, ok := toImmutableRef(refs[i])
|
||||
if !ok {
|
||||
return errors.Errorf("invalid reference for exporting: %T", ref)
|
||||
}
|
||||
dgst, err := getInputContentHash(ctx, ref, contentMap[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dgsts[i] = dgst
|
||||
return nil
|
||||
})
|
||||
}(i)
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
dt, err := json.Marshal(struct {
|
||||
Main digest.Digest
|
||||
Inputs []digest.Digest
|
||||
}{
|
||||
Main: mainDigest,
|
||||
Inputs: dgsts,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return digest.FromBytes(dt), nil
|
||||
}
|
||||
|
||||
type VertexEvaluator interface {
|
||||
Next(context.Context) (*VertexResult, error)
|
||||
Cancel() error
|
||||
|
|
|
@ -97,6 +97,6 @@ func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error)
|
|||
return []Reference{ref}, nil
|
||||
}
|
||||
|
||||
func (s *sourceOp) ContentKeys(context.Context, [][]digest.Digest, []Reference) ([]digest.Digest, error) {
|
||||
return nil, nil
|
||||
func (s *sourceOp) ContentMask(context.Context) (digest.Digest, [][]string, error) {
|
||||
return "", nil, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue