Merge pull request #57 from tonistiigi/solver-separate-llb

solver: remove llb dependency
docker-18.09
Akihiro Suda 2017-07-06 15:09:45 +09:00 committed by GitHub
commit 504818c289
9 changed files with 311 additions and 167 deletions

View File

@ -29,7 +29,7 @@ type Controller struct { // TODO: ControlService
func NewController(opt Opt) (*Controller, error) {
c := &Controller{
opt: opt,
solver: solver.New(solver.Opt{
solver: solver.NewLLBSolver(solver.LLBOpt{
SourceManager: opt.SourceManager,
CacheManager: opt.CacheManager,
Worker: opt.Worker,
@ -62,7 +62,7 @@ func (c *Controller) DiskUsage(ctx context.Context, _ *controlapi.DiskUsageReque
}
func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*controlapi.SolveResponse, error) {
v, err := solver.Load(req.Definition)
v, err := solver.LoadLLB(req.Definition)
if err != nil {
return nil, errors.Wrap(err, "failed to load")
}

View File

@ -1,7 +1,6 @@
package solver
import (
"context"
"io"
"os"
@ -12,9 +11,24 @@ import (
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
func runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op_Exec, inputs []cache.ImmutableRef) ([]cache.ImmutableRef, error) {
type execOp struct {
op *pb.Op_Exec
cm cache.Manager
w worker.Worker
}
func newExecOp(op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) {
return &execOp{
op: op,
cm: cm,
w: w,
}, nil
}
func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) {
mounts := make(map[string]cache.Mountable)
var outputs []cache.MutableRef
@ -30,15 +44,24 @@ func runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op
}
}()
for _, m := range op.Exec.Mounts {
for _, m := range e.op.Exec.Mounts {
var mountable cache.Mountable
if int(m.Input) > len(inputs) {
return nil, errors.Errorf("missing input %d", m.Input)
}
ref := inputs[int(m.Input)]
inp := inputs[int(m.Input)]
if sys, ok := inp.(interface {
Sys() Reference
}); ok {
inp = sys.Sys()
}
ref, ok := inp.(cache.ImmutableRef)
if !ok {
return nil, errors.Errorf("invalid reference for exec %T", inputs[int(m.Input)])
}
mountable = ref
if m.Output != -1 {
active, err := cm.New(ctx, ref) // TODO: should be method
active, err := e.cm.New(ctx, ref) // TODO: should be method
if err != nil {
return nil, err
}
@ -49,9 +72,9 @@ func runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op
}
meta := worker.Meta{
Args: op.Exec.Meta.Args,
Env: op.Exec.Meta.Env,
Cwd: op.Exec.Meta.Cwd,
Args: e.op.Exec.Meta.Args,
Env: e.op.Exec.Meta.Env,
Cwd: e.op.Exec.Meta.Cwd,
}
stdout := newStreamWriter(ctx, 1)
@ -59,11 +82,11 @@ func runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op
stderr := newStreamWriter(ctx, 2)
defer stderr.Close()
if err := w.Exec(ctx, meta, mounts, stdout, stderr); err != nil {
if err := e.w.Exec(ctx, meta, mounts, stdout, stderr); err != nil {
return nil, errors.Wrapf(err, "worker failed running %v", meta.Args)
}
refs := []cache.ImmutableRef{}
refs := []Reference{}
for i, o := range outputs {
ref, err := o.ReleaseAndCommit(ctx)
if err != nil {

View File

@ -27,7 +27,7 @@ func newJobList() *jobList {
return jl
}
func (jl *jobList) new(ctx context.Context, id string, g *opVertex, pr progress.Reader) (*job, error) {
func (jl *jobList) new(ctx context.Context, id string, g *vertex, pr progress.Reader) (*job, error) {
jl.mu.Lock()
defer jl.mu.Unlock()
@ -75,15 +75,16 @@ func (jl *jobList) get(id string) (*job, error) {
type job struct {
mu sync.Mutex
g *opVertex
g *vertex
pr *progress.MultiReader
}
func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
pr := j.pr.Reader(ctx)
for v := range walk(j.g) {
vv := v.(*vertex)
ss := &client.SolveStatus{
Vertexes: []*client.Vertex{&v.vtx},
Vertexes: []*client.Vertex{&vv.clientVertex},
}
select {
case <-ctx.Done():
@ -141,23 +142,23 @@ func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
}
}
func walk(op *opVertex) chan *opVertex {
func walk(v Vertex) chan Vertex {
cache := make(map[digest.Digest]struct{})
ch := make(chan *opVertex, 32)
ch := make(chan Vertex, 32)
var send func(op *opVertex)
send = func(op *opVertex) {
for _, v := range op.inputs {
send(v)
var send func(v Vertex)
send = func(v Vertex) {
for _, v := range v.Inputs() {
send(v.Vertex)
}
if _, ok := cache[op.dgst]; !ok {
ch <- op
cache[op.dgst] = struct{}{}
if _, ok := cache[v.Digest()]; !ok {
ch <- v
cache[v.Digest()] = struct{}{}
}
}
go func() {
send(op)
send(v)
close(ch)
}()
return ch

View File

@ -1,70 +1,89 @@
package solver
import (
"github.com/moby/buildkit/client"
"strings"
"github.com/moby/buildkit/solver/pb"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
func Load(ops [][]byte) (*opVertex, error) {
func LoadLLB(ops [][]byte) (Vertex, error) {
if len(ops) == 0 {
return nil, errors.New("invalid empty definition")
}
m := make(map[digest.Digest]*pb.Op)
allOps := make(map[digest.Digest]*pb.Op)
var lastOp *pb.Op
var dgst digest.Digest
var lastDigest digest.Digest
for i, dt := range ops {
for _, dt := range ops {
var op pb.Op
if err := (&op).Unmarshal(dt); err != nil {
return nil, errors.Wrap(err, "failed to parse op")
return nil, errors.Wrap(err, "failed to parse llb proto op")
}
lastOp = &op
dgst = digest.FromBytes(dt)
if i != len(ops)-1 {
m[dgst] = &op
}
// logrus.Debugf("op %d %s %#v", i, dgst, op)
lastDigest = digest.FromBytes(dt)
allOps[lastDigest] = &op
}
cache := make(map[digest.Digest]*opVertex)
delete(allOps, lastDigest) // avoid loops
cache := make(map[digest.Digest]*vertex)
// TODO: validate the connections
vtx, err := loadReqursive(dgst, lastOp, m, cache)
if err != nil {
return nil, err
return loadLLBVertexRecursive(lastDigest, lastOp, allOps, cache)
}
return vtx, err
func toInternalVertex(v Vertex) *vertex {
cache := make(map[digest.Digest]*vertex)
return loadInternalVertexHelper(v, cache)
}
func loadReqursive(dgst digest.Digest, op *pb.Op, inputs map[digest.Digest]*pb.Op, cache map[digest.Digest]*opVertex) (*opVertex, error) {
func loadInternalVertexHelper(v Vertex, cache map[digest.Digest]*vertex) *vertex {
if v, ok := cache[v.Digest()]; ok {
return v
}
vtx := &vertex{sys: v.Sys(), digest: v.Digest(), name: v.Name()}
for _, in := range v.Inputs() {
vv := loadInternalVertexHelper(in.Vertex, cache)
vtx.inputs = append(vtx.inputs, &input{index: in.Index, vertex: vv})
}
vtx.initClientVertex()
cache[v.Digest()] = vtx
return vtx
}
func loadLLBVertexRecursive(dgst digest.Digest, op *pb.Op, all map[digest.Digest]*pb.Op, cache map[digest.Digest]*vertex) (*vertex, error) {
if v, ok := cache[dgst]; ok {
return v, nil
}
vtx := &opVertex{op: op, dgst: dgst}
inputDigests := make([]digest.Digest, 0, len(op.Inputs))
vtx := &vertex{sys: op.Op, digest: dgst, name: llbOpName(op)}
for _, in := range op.Inputs {
dgst := digest.Digest(in.Digest)
inputDigests = append(inputDigests, dgst)
op, ok := inputs[dgst]
op, ok := all[dgst]
if !ok {
return nil, errors.Errorf("failed to find %s", in)
}
sub, err := loadReqursive(dgst, op, inputs, cache)
sub, err := loadLLBVertexRecursive(dgst, op, all, cache)
if err != nil {
return nil, err
}
vtx.inputs = append(vtx.inputs, sub)
}
vtx.vtx = client.Vertex{
Inputs: inputDigests,
Name: vtx.name(),
Digest: dgst,
vtx.inputs = append(vtx.inputs, &input{index: int(in.Index), vertex: sub})
}
vtx.initClientVertex()
cache[dgst] = vtx
return vtx, nil
}
func llbOpName(op *pb.Op) string {
switch op := op.Op.(type) {
case *pb.Op_Source:
return op.Source.Identifier
case *pb.Op_Exec:
return strings.Join(op.Exec.Meta.Args, " ")
default:
return "unknown"
}
}

View File

@ -3,7 +3,6 @@ package solver
import (
"sync"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
@ -44,7 +43,7 @@ func (c *refCache) probe(j *job, key digest.Digest) bool {
c.mu.Unlock()
return false
}
func (c *refCache) get(key digest.Digest) ([]cache.ImmutableRef, error) {
func (c *refCache) get(key digest.Digest) ([]Reference, error) {
c.mu.Lock()
defer c.mu.Unlock()
v, ok := c.cache[key]
@ -55,13 +54,13 @@ func (c *refCache) get(key digest.Digest) ([]cache.ImmutableRef, error) {
if v.value == nil {
return nil, errors.Errorf("no ref cache value set")
}
refs := make([]cache.ImmutableRef, 0, len(v.value))
refs := make([]Reference, 0, len(v.value))
for _, r := range v.value {
refs = append(refs, r.Clone())
}
return refs, nil
}
func (c *refCache) set(ctx context.Context, key digest.Digest, refs []cache.ImmutableRef) {
func (c *refCache) set(ctx context.Context, key digest.Digest, refs []Reference) {
c.mu.Lock()
sharedRefs := make([]*sharedRef, 0, len(refs))
for _, r := range refs {
@ -110,20 +109,20 @@ func (c *refCache) writeProgressSnapshot(ctx context.Context, key digest.Digest)
type sharedRef struct {
mu sync.Mutex
refs map[*sharedRefInstance]struct{}
main cache.ImmutableRef
cache.ImmutableRef
main Reference
Reference
}
func newSharedRef(main cache.ImmutableRef) *sharedRef {
func newSharedRef(main Reference) *sharedRef {
mr := &sharedRef{
refs: make(map[*sharedRefInstance]struct{}),
ImmutableRef: main,
Reference: main,
}
mr.main = mr.Clone()
return mr
}
func (mr *sharedRef) Clone() cache.ImmutableRef {
func (mr *sharedRef) Clone() Reference {
mr.mu.Lock()
r := &sharedRefInstance{sharedRef: mr}
mr.refs[r] = struct{}{}
@ -135,6 +134,16 @@ func (mr *sharedRef) Release(ctx context.Context) error {
return mr.main.Release(ctx)
}
func (mr *sharedRef) Sys() Reference {
sys := mr.Reference
if s, ok := sys.(interface {
Sys() Reference
}); ok {
return s.Sys()
}
return sys
}
type sharedRefInstance struct {
*sharedRef
}
@ -144,7 +153,7 @@ func (r *sharedRefInstance) Release(ctx context.Context) error {
defer r.sharedRef.mu.Unlock()
delete(r.sharedRef.refs, r)
if len(r.sharedRef.refs) == 0 {
return r.sharedRef.ImmutableRef.Release(ctx)
return r.sharedRef.Reference.Release(ctx)
}
return nil
}

View File

@ -1,49 +1,77 @@
package solver
import (
"strings"
"sync"
"time"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
)
type Opt struct {
type LLBOpt struct {
SourceManager *source.Manager
CacheManager cache.Manager // TODO: this shouldn't be needed before instruction cache
Worker worker.Worker
}
func NewLLBSolver(opt LLBOpt) *Solver {
return New(func(v Vertex) (Op, error) {
switch op := v.Sys().(type) {
case *pb.Op_Source:
return newSourceOp(op, opt.SourceManager)
case *pb.Op_Exec:
return newExecOp(op, opt.CacheManager, opt.Worker)
default:
return nil, errors.Errorf("invalid op type %T", op)
}
})
}
// ResolveOpFunc finds an Op implementation for a vertex
type ResolveOpFunc func(Vertex) (Op, error)
// Reference is a reference to the object passed through the build steps.
type Reference interface {
Release(context.Context) error
}
// Op is an implementation for running a vertex
type Op interface {
// CacheKeys(context.Context, [][]string) ([]string, error)
Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error)
}
// type Cache interface {
// Lookup(context.Context, string) ([]Reference, error)
// }
type Solver struct {
opt Opt
resolve ResolveOpFunc
jobs *jobList
active refCache
}
func New(opt Opt) *Solver {
return &Solver{opt: opt, jobs: newJobList()}
func New(resolve ResolveOpFunc) *Solver {
return &Solver{resolve: resolve, jobs: newJobList()}
}
func (s *Solver) Solve(ctx context.Context, id string, g *opVertex) error {
func (s *Solver) Solve(ctx context.Context, id string, v Vertex) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
pr, ctx, closeProgressWriter := progress.NewContext(ctx)
if len(g.inputs) > 0 { // TODO: detect op_return better
g = g.inputs[0]
if len(v.Inputs()) > 0 { // TODO: detect op_return better
v = v.Inputs()[0].Vertex
}
j, err := s.jobs.new(ctx, id, g, pr)
vv := toInternalVertex(v)
j, err := s.jobs.new(ctx, id, vv, pr)
if err != nil {
return err
}
@ -71,16 +99,16 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.
return j.pipe(ctx, statusChan)
}
func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cache.ImmutableRef, retErr error) {
func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Reference, retErr error) {
s.active.probe(j, g.dgst) // this registers the key with the job
s.active.probe(j, g.digest) // this registers the key with the job
// refs contains all outputs for all input vertexes
refs := make([][]*sharedRef, len(g.inputs))
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
for i, in := range g.inputs {
func(i int, in *opVertex) {
func(i int, in *vertex) {
eg.Go(func() error {
r, err := s.getRefs(ctx, j, in)
if err != nil {
@ -91,7 +119,7 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac
}
return nil
})
}(i, in)
}(i, in.vertex)
}
err := eg.Wait()
if err != nil {
@ -105,17 +133,13 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac
}
// determine the inputs that were needed
inputs := make([]cache.ImmutableRef, 0, len(g.op.Inputs))
for _, inp := range g.op.Inputs {
for i, v := range g.inputs {
if v.dgst == digest.Digest(inp.Digest) {
inputs = append(inputs, refs[i][int(inp.Index)].Clone())
}
}
inputRefs := make([]Reference, 0, len(g.inputs))
for i, inp := range g.inputs {
inputRefs = append(inputRefs, refs[i][inp.index].Clone())
}
defer func() {
for _, r := range inputs {
for _, r := range inputRefs {
go r.Release(context.TODO())
}
}()
@ -127,7 +151,7 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac
}
}
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.dgst))
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.Digest()))
defer pw.Close()
g.notifyStarted(ctx)
@ -135,76 +159,26 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *opVertex) (retRef []cac
g.notifyCompleted(ctx, retErr)
}()
_, err := s.active.Do(ctx, g.dgst.String(), func(doctx context.Context) (interface{}, error) {
if hit := s.active.probe(j, g.dgst); hit {
if err := s.active.writeProgressSnapshot(ctx, g.dgst); err != nil {
_, err := s.active.Do(ctx, g.digest.String(), func(doctx context.Context) (interface{}, error) {
if hit := s.active.probe(j, g.digest); hit {
if err := s.active.writeProgressSnapshot(ctx, g.digest); err != nil {
return nil, err
}
return nil, nil
}
refs, err := s.runVertex(doctx, g, inputs)
op, err := s.resolve(g)
if err != nil {
return nil, err
}
s.active.set(doctx, g.dgst, refs)
refs, err := op.Run(doctx, inputRefs)
if err != nil {
return nil, err
}
s.active.set(doctx, g.digest, refs)
return nil, nil
})
if err != nil {
return nil, err
}
return s.active.get(g.dgst)
}
func (s *Solver) runVertex(ctx context.Context, g *opVertex, inputs []cache.ImmutableRef) ([]cache.ImmutableRef, error) {
switch op := g.op.Op.(type) {
case *pb.Op_Source:
return runSourceOp(ctx, s.opt.SourceManager, op)
case *pb.Op_Exec:
return runExecOp(ctx, s.opt.CacheManager, s.opt.Worker, op, inputs)
default:
return nil, errors.Errorf("invalid op type %T", g.op.Op)
}
}
type opVertex struct {
mu sync.Mutex
op *pb.Op
inputs []*opVertex
err error
dgst digest.Digest
vtx client.Vertex
}
func (g *opVertex) inputRequiresExport(i int) bool {
return true // TODO
}
func (g *opVertex) notifyStarted(ctx context.Context) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
g.vtx.Started = &now
pw.Write(g.dgst.String(), g.vtx)
}
func (g *opVertex) notifyCompleted(ctx context.Context, err error) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
g.vtx.Completed = &now
if err != nil {
g.vtx.Error = err.Error()
}
pw.Write(g.dgst.String(), g.vtx)
}
func (g *opVertex) name() string {
switch op := g.op.Op.(type) {
case *pb.Op_Source:
return op.Source.Identifier
case *pb.Op_Exec:
return strings.Join(op.Exec.Meta.Args, " ")
default:
return "unknown"
}
return s.active.get(g.digest)
}

View File

@ -1,21 +1,31 @@
package solver
import (
"context"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"golang.org/x/net/context"
)
func runSourceOp(ctx context.Context, sm *source.Manager, op *pb.Op_Source) ([]cache.ImmutableRef, error) {
id, err := source.FromString(op.Source.Identifier)
type sourceOp struct {
op *pb.Op_Source
sm *source.Manager
}
func newSourceOp(op *pb.Op_Source, sm *source.Manager) (Op, error) {
return &sourceOp{
op: op,
sm: sm,
}, nil
}
func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) {
id, err := source.FromString(s.op.Source.Identifier)
if err != nil {
return nil, err
}
ref, err := sm.Pull(ctx, id)
ref, err := s.sm.Pull(ctx, id)
if err != nil {
return nil, err
}
return []cache.ImmutableRef{ref}, nil
return []Reference{ref}, nil
}

98
solver/vertex.go Normal file
View File

@ -0,0 +1,98 @@
package solver
import (
"sync"
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
"golang.org/x/net/context"
)
// Vertex is one node in the build graph
type Vertex interface {
// Digest is a content-addressable vertex identifier
Digest() digest.Digest
// Sys returns an internal value that is used to execute the vertex. Usually
// this is capured by the operation resolver method during solve.
Sys() interface{}
// Array of vertexes current vertex depends on.
Inputs() []Input
Name() string // change this to general metadata
}
// Input is an pointer to a single reference from a vertex by an index.
type Input struct {
Index int
Vertex Vertex
}
type input struct {
index int
vertex *vertex
}
type vertex struct {
mu sync.Mutex
sys interface{}
inputs []*input
err error
digest digest.Digest
clientVertex client.Vertex
name string
}
func (v *vertex) initClientVertex() {
inputDigests := make([]digest.Digest, 0, len(v.inputs))
for _, inp := range v.inputs {
inputDigests = append(inputDigests, inp.vertex.Digest())
}
v.clientVertex = client.Vertex{
Inputs: inputDigests,
Name: v.Name(),
Digest: v.digest,
}
}
func (v *vertex) Digest() digest.Digest {
return v.digest
}
func (v *vertex) Sys() interface{} {
return v.sys
}
func (v *vertex) Inputs() (inputs []Input) {
for _, i := range v.inputs {
inputs = append(inputs, Input{i.index, i.vertex})
}
return
}
func (v *vertex) Name() string {
return v.name
}
func (v *vertex) inputRequiresExport(i int) bool {
return true // TODO
}
func (v *vertex) notifyStarted(ctx context.Context) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
v.clientVertex.Started = &now
pw.Write(v.Digest().String(), v.clientVertex)
}
func (v *vertex) notifyCompleted(ctx context.Context, err error) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
v.clientVertex.Completed = &now
if err != nil {
v.clientVertex.Error = err.Error()
}
pw.Write(v.Digest().String(), v.clientVertex)
}

View File

@ -13,6 +13,16 @@ type Source interface {
Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error)
}
// type Source interface {
// ID() string
// Resolve(ctx context.Context, id Identifier) (SourceInstance, error)
// }
//
// type SourceInstance interface {
// GetCacheKey(ctx context.Context) ([]string, error)
// GetSnapshot(ctx context.Context) (cache.ImmutableRef, error)
// }
type Manager struct {
mu sync.Mutex
sources map[string]Source