Merge pull request #832 from AkihiroSuda/decouple-sm-from-worker
decouple SessionManager from Workerdocker-19.03
commit
f1dac983fa
|
@ -57,8 +57,7 @@ func init() {
|
|||
}
|
||||
|
||||
type workerInitializerOpt struct {
|
||||
sessionManager *session.Manager
|
||||
config *config.Config
|
||||
config *config.Config
|
||||
}
|
||||
|
||||
type workerInitializer struct {
|
||||
|
@ -494,8 +493,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
|
|||
return nil, err
|
||||
}
|
||||
wc, err := newWorkerController(c, workerInitializerOpt{
|
||||
sessionManager: sessionManager,
|
||||
config: cfg,
|
||||
config: cfg,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -176,7 +176,6 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opt.SessionManager = common.sessionManager
|
||||
opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root)
|
||||
opt.ResolveOptionsFunc = resolverFunc(common.config)
|
||||
|
||||
|
|
|
@ -190,7 +190,6 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opt.SessionManager = common.sessionManager
|
||||
opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root)
|
||||
opt.ResolveOptionsFunc = resolverFunc(common.config)
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ func NewController(opt Opt) (*Controller, error) {
|
|||
|
||||
gatewayForwarder := controlgateway.NewGatewayForwarder()
|
||||
|
||||
solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder)
|
||||
solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder, opt.SessionManager)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create solver")
|
||||
}
|
||||
|
@ -223,7 +223,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
|
|||
return nil, err
|
||||
}
|
||||
if req.Exporter != "" {
|
||||
exp, err := w.Exporter(req.Exporter)
|
||||
exp, err := w.Exporter(req.Exporter, c.opt.SessionManager)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/moby/buildkit/frontend"
|
||||
gw "github.com/moby/buildkit/frontend/gateway/client"
|
||||
"github.com/moby/buildkit/identity"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/util/tracing"
|
||||
"github.com/moby/buildkit/worker"
|
||||
|
@ -32,6 +33,7 @@ type llbBridge struct {
|
|||
cms map[string]solver.CacheManager
|
||||
cmsMu sync.Mutex
|
||||
platforms []specs.Platform
|
||||
sm *session.Manager
|
||||
}
|
||||
|
||||
func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (res *frontend.Result, err error) {
|
||||
|
@ -156,7 +158,7 @@ func (s *llbBridge) ResolveImageConfig(ctx context.Context, ref string, opt gw.R
|
|||
id += platforms.Format(*platform)
|
||||
}
|
||||
err = inVertexContext(s.builder.Context(ctx), opt.LogName, id, func(ctx context.Context) error {
|
||||
dgst, config, err = w.ResolveImageConfig(ctx, ref, opt)
|
||||
dgst, config, err = w.ResolveImageConfig(ctx, ref, opt, s.sm)
|
||||
return err
|
||||
})
|
||||
return dgst, config, err
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
"github.com/moby/buildkit/solver/pb"
|
||||
"github.com/moby/buildkit/source"
|
||||
|
@ -20,14 +21,16 @@ type sourceOp struct {
|
|||
platform *pb.Platform
|
||||
sm *source.Manager
|
||||
src source.SourceInstance
|
||||
sessM *session.Manager
|
||||
w worker.Worker
|
||||
}
|
||||
|
||||
func NewSourceOp(_ solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, w worker.Worker) (solver.Op, error) {
|
||||
func NewSourceOp(_ solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, sessM *session.Manager, w worker.Worker) (solver.Op, error) {
|
||||
return &sourceOp{
|
||||
op: op,
|
||||
sm: sm,
|
||||
w: w,
|
||||
sessM: sessM,
|
||||
platform: platform,
|
||||
}, nil
|
||||
}
|
||||
|
@ -42,7 +45,7 @@ func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
src, err := s.sm.Resolve(ctx, id)
|
||||
src, err := s.sm.Resolve(ctx, id, s.sessM)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -44,15 +44,17 @@ type Solver struct {
|
|||
resolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
|
||||
platforms []specs.Platform
|
||||
gatewayForwarder *controlgateway.GatewayForwarder
|
||||
sm *session.Manager
|
||||
}
|
||||
|
||||
func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI map[string]remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder) (*Solver, error) {
|
||||
func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI map[string]remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder, sm *session.Manager) (*Solver, error) {
|
||||
s := &Solver{
|
||||
workerController: wc,
|
||||
resolveWorker: defaultResolver(wc),
|
||||
frontends: f,
|
||||
resolveCacheImporterFuncs: resolveCI,
|
||||
gatewayForwarder: gatewayForwarder,
|
||||
sm: sm,
|
||||
}
|
||||
|
||||
// executing is currently only allowed on default worker
|
||||
|
@ -75,7 +77,7 @@ func (s *Solver) resolver() solver.ResolveOpFunc {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return w.ResolveOp(v, s.Bridge(b))
|
||||
return w.ResolveOp(v, s.Bridge(b), s.sm)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,6 +89,7 @@ func (s *Solver) Bridge(b solver.Builder) frontend.FrontendLLBBridge {
|
|||
resolveCacheImporterFuncs: s.resolveCacheImporterFuncs,
|
||||
cms: map[string]solver.CacheManager{},
|
||||
platforms: s.platforms,
|
||||
sm: s.sm,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,13 +30,12 @@ import (
|
|||
// code can be used with any implementation
|
||||
|
||||
type SourceOpt struct {
|
||||
SessionManager *session.Manager
|
||||
Snapshotter snapshot.Snapshotter
|
||||
ContentStore content.Store
|
||||
Applier diff.Applier
|
||||
CacheAccessor cache.Accessor
|
||||
ImageStore images.Store // optional
|
||||
ResolverOpt resolver.ResolveOptionsFunc
|
||||
Snapshotter snapshot.Snapshotter
|
||||
ContentStore content.Store
|
||||
Applier diff.Applier
|
||||
CacheAccessor cache.Accessor
|
||||
ImageStore images.Store // optional
|
||||
ResolverOpt resolver.ResolveOptionsFunc
|
||||
}
|
||||
|
||||
type imageSource struct {
|
||||
|
@ -56,7 +55,7 @@ func (is *imageSource) ID() string {
|
|||
return source.DockerImageScheme
|
||||
}
|
||||
|
||||
func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt) (digest.Digest, []byte, error) {
|
||||
func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) {
|
||||
type t struct {
|
||||
dgst digest.Digest
|
||||
dt []byte
|
||||
|
@ -72,7 +71,7 @@ func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt g
|
|||
}
|
||||
|
||||
res, err := is.g.Do(ctx, key, func(ctx context.Context) (interface{}, error) {
|
||||
dgst, dt, err := imageutil.Config(ctx, ref, pull.NewResolver(ctx, is.ResolverOpt, is.SessionManager, is.ImageStore, rm, ref), is.ContentStore, opt.Platform)
|
||||
dgst, dt, err := imageutil.Config(ctx, ref, pull.NewResolver(ctx, is.ResolverOpt, sm, is.ImageStore, rm, ref), is.ContentStore, opt.Platform)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -85,7 +84,7 @@ func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt g
|
|||
return typed.dgst, typed.dt, nil
|
||||
}
|
||||
|
||||
func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) {
|
||||
func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
|
||||
imageIdentifier, ok := id.(*source.ImageIdentifier)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid image identifier %v", id)
|
||||
|
@ -101,7 +100,7 @@ func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (sourc
|
|||
ContentStore: is.ContentStore,
|
||||
Applier: is.Applier,
|
||||
Src: imageIdentifier.Reference,
|
||||
Resolver: pull.NewResolver(ctx, is.ResolverOpt, is.SessionManager, is.ImageStore, imageIdentifier.ResolveMode, imageIdentifier.Reference.String()),
|
||||
Resolver: pull.NewResolver(ctx, is.ResolverOpt, sm, is.ImageStore, imageIdentifier.ResolveMode, imageIdentifier.Reference.String()),
|
||||
Platform: &platform,
|
||||
}
|
||||
p := &puller{
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/identity"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
"github.com/moby/buildkit/source"
|
||||
"github.com/moby/buildkit/util/progress/logs"
|
||||
|
@ -152,7 +153,7 @@ type gitSourceHandler struct {
|
|||
cacheKey string
|
||||
}
|
||||
|
||||
func (gs *gitSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) {
|
||||
func (gs *gitSource) Resolve(ctx context.Context, id source.Identifier, _ *session.Manager) (source.SourceInstance, error) {
|
||||
gitIdentifier, ok := id.(*source.GitIdentifier)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid git identifier %v", id)
|
||||
|
|
|
@ -46,7 +46,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
|
|||
|
||||
id := &source.GitIdentifier{Remote: repodir, KeepGitDir: keepGitDir}
|
||||
|
||||
g, err := gs.Resolve(ctx, id)
|
||||
g, err := gs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
key1, done, err := g.CacheKey(ctx, 0)
|
||||
|
@ -83,7 +83,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
|
|||
// second fetch returns same dir
|
||||
id = &source.GitIdentifier{Remote: repodir, Ref: "master", KeepGitDir: keepGitDir}
|
||||
|
||||
g, err = gs.Resolve(ctx, id)
|
||||
g, err = gs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
key2, _, err := g.CacheKey(ctx, 0)
|
||||
|
@ -99,7 +99,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
|
|||
|
||||
id = &source.GitIdentifier{Remote: repodir, Ref: "feature", KeepGitDir: keepGitDir}
|
||||
|
||||
g, err = gs.Resolve(ctx, id)
|
||||
g, err = gs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
key3, _, err := g.CacheKey(ctx, 0)
|
||||
|
@ -164,7 +164,7 @@ func testFetchBySHA(t *testing.T, keepGitDir bool) {
|
|||
|
||||
id := &source.GitIdentifier{Remote: repodir, Ref: sha, KeepGitDir: keepGitDir}
|
||||
|
||||
g, err := gs.Resolve(ctx, id)
|
||||
g, err := gs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
key1, done, err := g.CacheKey(ctx, 0)
|
||||
|
@ -238,10 +238,10 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) {
|
|||
id := &source.GitIdentifier{Remote: repodir, KeepGitDir: keepGitDir}
|
||||
id2 := &source.GitIdentifier{Remote: repodir2, KeepGitDir: keepGitDir}
|
||||
|
||||
g, err := gs.Resolve(ctx, id)
|
||||
g, err := gs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
g2, err := gs.Resolve(ctx, id2)
|
||||
g2, err := gs.Resolve(ctx, id2, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
key1, _, err := g.CacheKey(ctx, 0)
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/docker/docker/pkg/locker"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
"github.com/moby/buildkit/source"
|
||||
"github.com/moby/buildkit/util/tracing"
|
||||
|
@ -66,7 +67,7 @@ type httpSourceHandler struct {
|
|||
cacheKey digest.Digest
|
||||
}
|
||||
|
||||
func (hs *httpSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) {
|
||||
func (hs *httpSource) Resolve(ctx context.Context, id source.Identifier, _ *session.Manager) (source.SourceInstance, error) {
|
||||
httpIdentifier, ok := id.(*source.HttpIdentifier)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid http identifier %v", id)
|
||||
|
|
|
@ -40,7 +40,7 @@ func TestHTTPSource(t *testing.T) {
|
|||
|
||||
id := &source.HttpIdentifier{URL: server.URL + "/foo"}
|
||||
|
||||
h, err := hs.Resolve(ctx, id)
|
||||
h, err := hs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
k, _, err := h.CacheKey(ctx, 0)
|
||||
|
@ -69,7 +69,7 @@ func TestHTTPSource(t *testing.T) {
|
|||
ref = nil
|
||||
|
||||
// repeat, should use the etag
|
||||
h, err = hs.Resolve(ctx, id)
|
||||
h, err = hs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
k, _, err = h.CacheKey(ctx, 0)
|
||||
|
@ -105,7 +105,7 @@ func TestHTTPSource(t *testing.T) {
|
|||
// update etag, downloads again
|
||||
server.SetRoute("/foo", resp2)
|
||||
|
||||
h, err = hs.Resolve(ctx, id)
|
||||
h, err = hs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
k, _, err = h.CacheKey(ctx, 0)
|
||||
|
@ -154,7 +154,7 @@ func TestHTTPDefaultName(t *testing.T) {
|
|||
|
||||
id := &source.HttpIdentifier{URL: server.URL}
|
||||
|
||||
h, err := hs.Resolve(ctx, id)
|
||||
h, err := hs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
k, _, err := h.CacheKey(ctx, 0)
|
||||
|
@ -197,7 +197,7 @@ func TestHTTPInvalidURL(t *testing.T) {
|
|||
|
||||
id := &source.HttpIdentifier{URL: server.URL + "/foo"}
|
||||
|
||||
h, err := hs.Resolve(ctx, id)
|
||||
h, err := hs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = h.CacheKey(ctx, 0)
|
||||
|
@ -227,7 +227,7 @@ func TestHTTPChecksum(t *testing.T) {
|
|||
|
||||
id := &source.HttpIdentifier{URL: server.URL + "/foo", Checksum: digest.FromBytes([]byte("content-different"))}
|
||||
|
||||
h, err := hs.Resolve(ctx, id)
|
||||
h, err := hs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
k, _, err := h.CacheKey(ctx, 0)
|
||||
|
@ -249,7 +249,7 @@ func TestHTTPChecksum(t *testing.T) {
|
|||
|
||||
id = &source.HttpIdentifier{URL: server.URL + "/foo", Checksum: digest.FromBytes([]byte("content-correct"))}
|
||||
|
||||
h, err = hs.Resolve(ctx, id)
|
||||
h, err = hs.Resolve(ctx, id, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
k, _, err = h.CacheKey(ctx, 0)
|
||||
|
|
|
@ -28,14 +28,12 @@ import (
|
|||
const keySharedKey = "local.sharedKey"
|
||||
|
||||
type Opt struct {
|
||||
SessionManager *session.Manager
|
||||
CacheAccessor cache.Accessor
|
||||
MetadataStore *metadata.Store
|
||||
CacheAccessor cache.Accessor
|
||||
MetadataStore *metadata.Store
|
||||
}
|
||||
|
||||
func NewSource(opt Opt) (source.Source, error) {
|
||||
ls := &localSource{
|
||||
sm: opt.SessionManager,
|
||||
cm: opt.CacheAccessor,
|
||||
md: opt.MetadataStore,
|
||||
}
|
||||
|
@ -43,7 +41,6 @@ func NewSource(opt Opt) (source.Source, error) {
|
|||
}
|
||||
|
||||
type localSource struct {
|
||||
sm *session.Manager
|
||||
cm cache.Accessor
|
||||
md *metadata.Store
|
||||
}
|
||||
|
@ -52,7 +49,7 @@ func (ls *localSource) ID() string {
|
|||
return source.LocalScheme
|
||||
}
|
||||
|
||||
func (ls *localSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) {
|
||||
func (ls *localSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
|
||||
localIdentifier, ok := id.(*source.LocalIdentifier)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid local identifier %v", id)
|
||||
|
@ -60,12 +57,14 @@ func (ls *localSource) Resolve(ctx context.Context, id source.Identifier) (sourc
|
|||
|
||||
return &localSourceHandler{
|
||||
src: *localIdentifier,
|
||||
sm: sm,
|
||||
localSource: ls,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type localSourceHandler struct {
|
||||
src source.LocalIdentifier
|
||||
sm *session.Manager
|
||||
*localSource
|
||||
}
|
||||
|
||||
|
|
|
@ -5,12 +5,13 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Source interface {
|
||||
ID() string
|
||||
Resolve(ctx context.Context, id Identifier) (SourceInstance, error)
|
||||
Resolve(ctx context.Context, id Identifier, sm *session.Manager) (SourceInstance, error)
|
||||
}
|
||||
|
||||
type SourceInstance interface {
|
||||
|
@ -35,7 +36,7 @@ func (sm *Manager) Register(src Source) {
|
|||
sm.mu.Unlock()
|
||||
}
|
||||
|
||||
func (sm *Manager) Resolve(ctx context.Context, id Identifier) (SourceInstance, error) {
|
||||
func (sm *Manager) Resolve(ctx context.Context, id Identifier, sessM *session.Manager) (SourceInstance, error) {
|
||||
sm.mu.Lock()
|
||||
src, ok := sm.sources[id.ID()]
|
||||
sm.mu.Unlock()
|
||||
|
@ -44,5 +45,5 @@ func (sm *Manager) Resolve(ctx context.Context, id Identifier) (SourceInstance,
|
|||
return nil, errors.Errorf("no handler for %s", id.ID())
|
||||
}
|
||||
|
||||
return src.Resolve(ctx, id)
|
||||
return src.Resolve(ctx, id, sessM)
|
||||
}
|
||||
|
|
|
@ -61,7 +61,6 @@ type WorkerOpt struct {
|
|||
Labels map[string]string
|
||||
Platforms []specs.Platform
|
||||
GCPolicy []client.PruneInfo
|
||||
SessionManager *session.Manager
|
||||
MetadataStore *metadata.Store
|
||||
Executor executor.Executor
|
||||
Snapshotter snapshot.Snapshotter
|
||||
|
@ -78,7 +77,7 @@ type Worker struct {
|
|||
WorkerOpt
|
||||
CacheManager cache.Manager
|
||||
SourceManager *source.Manager
|
||||
Exporters map[string]exporter.Exporter
|
||||
imageWriter *imageexporter.ImageWriter
|
||||
ImageSource source.Source
|
||||
}
|
||||
|
||||
|
@ -105,13 +104,12 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
|
|||
}
|
||||
|
||||
is, err := containerimage.NewSource(containerimage.SourceOpt{
|
||||
Snapshotter: opt.Snapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
SessionManager: opt.SessionManager,
|
||||
Applier: opt.Applier,
|
||||
ImageStore: opt.ImageStore,
|
||||
CacheAccessor: cm,
|
||||
ResolverOpt: opt.ResolveOptionsFunc,
|
||||
Snapshotter: opt.Snapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
Applier: opt.Applier,
|
||||
ImageStore: opt.ImageStore,
|
||||
CacheAccessor: cm,
|
||||
ResolverOpt: opt.ResolveOptionsFunc,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -143,17 +141,14 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
|
|||
sm.Register(hs)
|
||||
|
||||
ss, err := local.NewSource(local.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
CacheAccessor: cm,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
CacheAccessor: cm,
|
||||
MetadataStore: opt.MetadataStore,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sm.Register(ss)
|
||||
|
||||
exporters := map[string]exporter.Exporter{}
|
||||
|
||||
iw, err := imageexporter.NewImageWriter(imageexporter.WriterOpt{
|
||||
Snapshotter: opt.Snapshotter,
|
||||
ContentStore: opt.ContentStore,
|
||||
|
@ -163,50 +158,11 @@ func NewWorker(opt WorkerOpt) (*Worker, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
imageExporter, err := imageexporter.New(imageexporter.Opt{
|
||||
Images: opt.ImageStore,
|
||||
SessionManager: opt.SessionManager,
|
||||
ImageWriter: iw,
|
||||
ResolverOpt: opt.ResolveOptionsFunc,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterImage] = imageExporter
|
||||
|
||||
localExporter, err := localexporter.New(localexporter.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterLocal] = localExporter
|
||||
|
||||
ociExporter, err := ociexporter.New(ociexporter.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
ImageWriter: iw,
|
||||
Variant: ociexporter.VariantOCI,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterOCI] = ociExporter
|
||||
|
||||
dockerExporter, err := ociexporter.New(ociexporter.Opt{
|
||||
SessionManager: opt.SessionManager,
|
||||
ImageWriter: iw,
|
||||
Variant: ociexporter.VariantDocker,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exporters[client.ExporterDocker] = dockerExporter
|
||||
|
||||
return &Worker{
|
||||
WorkerOpt: opt,
|
||||
CacheManager: cm,
|
||||
SourceManager: sm,
|
||||
Exporters: exporters,
|
||||
imageWriter: iw,
|
||||
ImageSource: is,
|
||||
}, nil
|
||||
}
|
||||
|
@ -235,13 +191,13 @@ func (w *Worker) LoadRef(id string, hidden bool) (cache.ImmutableRef, error) {
|
|||
return w.CacheManager.Get(context.TODO(), id, opts...)
|
||||
}
|
||||
|
||||
func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge) (solver.Op, error) {
|
||||
func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error) {
|
||||
if baseOp, ok := v.Sys().(*pb.Op); ok {
|
||||
switch op := baseOp.Op.(type) {
|
||||
case *pb.Op_Source:
|
||||
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, w)
|
||||
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, sm, w)
|
||||
case *pb.Op_Exec:
|
||||
return ops.NewExecOp(v, op, w.CacheManager, w.SessionManager, w.MetadataStore, w.Executor, w)
|
||||
return ops.NewExecOp(v, op, w.CacheManager, sm, w.MetadataStore, w.Executor, w)
|
||||
case *pb.Op_Build:
|
||||
return ops.NewBuildOp(v, op, s, w)
|
||||
}
|
||||
|
@ -249,17 +205,17 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge) (solve
|
|||
return nil, errors.Errorf("could not resolve %v", v)
|
||||
}
|
||||
|
||||
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt) (digest.Digest, []byte, error) {
|
||||
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) {
|
||||
// ImageSource is typically source/containerimage
|
||||
resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
|
||||
if !ok {
|
||||
return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.ID())
|
||||
}
|
||||
return resolveImageConfig.ResolveImageConfig(ctx, ref, opt)
|
||||
return resolveImageConfig.ResolveImageConfig(ctx, ref, opt, sm)
|
||||
}
|
||||
|
||||
type resolveImageConfig interface {
|
||||
ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt) (digest.Digest, []byte, error)
|
||||
ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error)
|
||||
}
|
||||
|
||||
func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
|
||||
|
@ -279,12 +235,34 @@ func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo, opt ...cli
|
|||
return w.CacheManager.Prune(ctx, ch, opt...)
|
||||
}
|
||||
|
||||
func (w *Worker) Exporter(name string) (exporter.Exporter, error) {
|
||||
exp, ok := w.Exporters[name]
|
||||
if !ok {
|
||||
func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, error) {
|
||||
switch name {
|
||||
case client.ExporterImage:
|
||||
return imageexporter.New(imageexporter.Opt{
|
||||
Images: w.ImageStore,
|
||||
SessionManager: sm,
|
||||
ImageWriter: w.imageWriter,
|
||||
ResolverOpt: w.ResolveOptionsFunc,
|
||||
})
|
||||
case client.ExporterLocal:
|
||||
return localexporter.New(localexporter.Opt{
|
||||
SessionManager: sm,
|
||||
})
|
||||
case client.ExporterOCI:
|
||||
return ociexporter.New(ociexporter.Opt{
|
||||
SessionManager: sm,
|
||||
ImageWriter: w.imageWriter,
|
||||
Variant: ociexporter.VariantOCI,
|
||||
})
|
||||
case client.ExporterDocker:
|
||||
return ociexporter.New(ociexporter.Opt{
|
||||
SessionManager: sm,
|
||||
ImageWriter: w.imageWriter,
|
||||
Variant: ociexporter.VariantDocker,
|
||||
})
|
||||
default:
|
||||
return nil, errors.Errorf("exporter %q could not be found", name)
|
||||
}
|
||||
return exp, nil
|
||||
}
|
||||
|
||||
func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) {
|
||||
|
|
|
@ -25,8 +25,6 @@ import (
|
|||
)
|
||||
|
||||
// NewWorkerOpt creates a WorkerOpt.
|
||||
// But it does not set the following fields:
|
||||
// - SessionManager
|
||||
func NewWorkerOpt(root string, address, snapshotterName, ns string, labels map[string]string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
|
||||
opts = append(opts, containerd.WithDefaultNamespace(ns))
|
||||
client, err := containerd.New(address, opts...)
|
||||
|
|
|
@ -32,8 +32,6 @@ type SnapshotterFactory struct {
|
|||
}
|
||||
|
||||
// NewWorkerOpt creates a WorkerOpt.
|
||||
// But it does not set the following fields:
|
||||
// - SessionManager
|
||||
func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string) (base.WorkerOpt, error) {
|
||||
var opt base.WorkerOpt
|
||||
name := "runc-" + snFactory.Name
|
||||
|
|
|
@ -42,8 +42,6 @@ func newWorkerOpt(t *testing.T, processMode oci.ProcessMode) (base.WorkerOpt, fu
|
|||
workerOpt, err := NewWorkerOpt(tmpdir, snFactory, rootless, processMode, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
workerOpt.SessionManager, err = session.NewManager()
|
||||
require.NoError(t, err)
|
||||
return workerOpt, cleanup
|
||||
}
|
||||
|
||||
|
@ -63,10 +61,10 @@ func newCtx(s string) context.Context {
|
|||
return namespaces.WithNamespace(context.Background(), s)
|
||||
}
|
||||
|
||||
func newBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker) cache.ImmutableRef {
|
||||
func newBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker, sm *session.Manager) cache.ImmutableRef {
|
||||
img, err := source.NewImageIdentifier("docker.io/library/busybox:latest")
|
||||
require.NoError(t, err)
|
||||
src, err := w.SourceManager.Resolve(ctx, img)
|
||||
src, err := w.SourceManager.Resolve(ctx, img, sm)
|
||||
require.NoError(t, err)
|
||||
snap, err := src.Snapshot(ctx)
|
||||
require.NoError(t, err)
|
||||
|
@ -83,7 +81,9 @@ func TestRuncWorker(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
ctx := newCtx("buildkit-test")
|
||||
snap := newBusyboxSourceSnapshot(ctx, t, w)
|
||||
sm, err := session.NewManager()
|
||||
require.NoError(t, err)
|
||||
snap := newBusyboxSourceSnapshot(ctx, t, w, sm)
|
||||
|
||||
mounts, err := snap.Mount(ctx, false)
|
||||
require.NoError(t, err)
|
||||
|
@ -185,7 +185,9 @@ func TestRuncWorkerNoProcessSandbox(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
ctx := newCtx("buildkit-test")
|
||||
snap := newBusyboxSourceSnapshot(ctx, t, w)
|
||||
sm, err := session.NewManager()
|
||||
require.NoError(t, err)
|
||||
snap := newBusyboxSourceSnapshot(ctx, t, w, sm)
|
||||
root, err := w.CacheManager.New(ctx, snap)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/moby/buildkit/exporter"
|
||||
"github.com/moby/buildkit/frontend"
|
||||
gw "github.com/moby/buildkit/frontend/gateway/client"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/solver"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
specs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
|
@ -23,12 +24,12 @@ type Worker interface {
|
|||
GCPolicy() []client.PruneInfo
|
||||
LoadRef(id string, hidden bool) (cache.ImmutableRef, error)
|
||||
// ResolveOp resolves Vertex.Sys() to Op implementation.
|
||||
ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge) (solver.Op, error)
|
||||
ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt) (digest.Digest, []byte, error)
|
||||
ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error)
|
||||
ResolveImageConfig(ctx context.Context, ref string, opt gw.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error)
|
||||
// Exec is similar to executor.Exec but without []mount.Mount
|
||||
Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
|
||||
DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error)
|
||||
Exporter(name string) (exporter.Exporter, error)
|
||||
Exporter(name string, sm *session.Manager) (exporter.Exporter, error)
|
||||
Prune(ctx context.Context, ch chan client.UsageInfo, opt ...client.PruneInfo) error
|
||||
GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error)
|
||||
FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error)
|
||||
|
|
Loading…
Reference in New Issue