Merge pull request #1551 from tonistiigi/session-group

session: track sessions with a group construct
v0.8
Edgar Lee 2020-07-08 16:37:07 -07:00 committed by GitHub
commit 488130002a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 550 additions and 400 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/progress"
@ -19,7 +20,7 @@ import (
"github.com/pkg/errors"
)
type ResolveCacheExporterFunc func(ctx context.Context, attrs map[string]string) (Exporter, error)
type ResolveCacheExporterFunc func(ctx context.Context, g session.Group, attrs map[string]string) (Exporter, error)
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)

View File

@ -10,6 +10,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/worker"
@ -21,7 +22,7 @@ import (
)
// ResolveCacheImporterFunc returns importer and descriptor.
type ResolveCacheImporterFunc func(ctx context.Context, attrs map[string]string) (Importer, ocispec.Descriptor, error)
type ResolveCacheImporterFunc func(ctx context.Context, g session.Group, attrs map[string]string) (Importer, ocispec.Descriptor, error)
type Importer interface {
Resolve(ctx context.Context, desc ocispec.Descriptor, id string, w worker.Worker) (solver.CacheManager, error)

View File

@ -6,13 +6,14 @@ import (
"github.com/moby/buildkit/cache/remotecache"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
)
func ResolveCacheExporterFunc() remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, _ map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, _ session.Group, _ map[string]string) (remotecache.Exporter, error) {
return NewExporter(), nil
}
}

View File

@ -22,13 +22,13 @@ const (
// ResolveCacheExporterFunc for "local" cache exporter.
func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Exporter, error) {
store := attrs[attrDest]
if store == "" {
return nil, errors.New("local cache exporter requires dest")
}
csID := contentStoreIDPrefix + store
cs, err := getContentStore(ctx, sm, csID)
cs, err := getContentStore(ctx, sm, g, csID)
if err != nil {
return nil, err
}
@ -38,7 +38,7 @@ func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExpor
// ResolveCacheImporterFunc for "local" cache importer.
func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
dgstStr := attrs[attrDigest]
if dgstStr == "" {
return nil, specs.Descriptor{}, errors.New("local cache importer requires explicit digest")
@ -49,7 +49,7 @@ func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImpor
return nil, specs.Descriptor{}, errors.New("local cache importer requires src")
}
csID := contentStoreIDPrefix + store
cs, err := getContentStore(ctx, sm, csID)
cs, err := getContentStore(ctx, sm, g, csID)
if err != nil {
return nil, specs.Descriptor{}, err
}
@ -67,8 +67,9 @@ func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImpor
}
}
func getContentStore(ctx context.Context, sm *session.Manager, storeID string) (content.Store, error) {
sessionID := session.FromContext(ctx)
func getContentStore(ctx context.Context, sm *session.Manager, g session.Group, storeID string) (content.Store, error) {
// TODO: to ensure correct session is detected, new api for finding if storeID is supported is needed
sessionID := g.SessionIterator().NextSession()
if sessionID == "" {
return nil, errors.New("local cache exporter/importer requires session")
}

View File

@ -32,12 +32,12 @@ const (
)
func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Exporter, error) {
ref, err := canonicalizeRef(attrs[attrRef])
if err != nil {
return nil, err
}
remote := resolver.New(ctx, hosts, sm)
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
pusher, err := remote.Pusher(ctx, ref)
if err != nil {
return nil, err
@ -47,12 +47,12 @@ func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) r
}
func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docker.RegistryHosts) remotecache.ResolveCacheImporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
ref, err := canonicalizeRef(attrs[attrRef])
if err != nil {
return nil, specs.Descriptor{}, err
}
remote := resolver.New(ctx, hosts, sm)
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
xref, desc, err := remote.Resolve(ctx, ref)
if err != nil {
return nil, specs.Descriptor{}, err

View File

@ -220,7 +220,6 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
if err := translateLegacySolveRequest(req); err != nil {
return nil, err
}
ctx = session.NewContext(ctx, req.Session)
defer func() {
time.AfterFunc(time.Second, c.throttledGC)
@ -260,7 +259,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
if !ok {
return nil, errors.Errorf("unknown cache exporter: %q", e.Type)
}
cacheExporter, err = cacheExporterFunc(ctx, e.Attrs)
cacheExporter, err = cacheExporterFunc(ctx, session.NewGroup(req.Session), e.Attrs)
if err != nil {
return nil, err
}
@ -273,7 +272,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
})
}
resp, err := c.solver.Solve(ctx, req.Ref, frontend.SolveRequest{
resp, err := c.solver.Solve(ctx, req.Ref, req.Session, frontend.SolveRequest{
Frontend: req.Frontend,
Definition: req.Definition,
FrontendOpt: req.FrontendAttrs,

View File

@ -168,7 +168,7 @@ func (e *imageExporterInstance) Name() string {
return "exporting to image"
}
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source) (map[string]string, error) {
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) {
if src.Metadata == nil {
src.Metadata = make(map[string][]byte)
}
@ -237,7 +237,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source)
}
}
if e.push {
if err := push.Push(ctx, e.opt.SessionManager, e.opt.ImageWriter.ContentStore(), desc.Digest, targetName, e.insecure, e.opt.RegistryHosts, e.pushByDigest); err != nil {
if err := push.Push(ctx, e.opt.SessionManager, sessionID, e.opt.ImageWriter.ContentStore(), desc.Digest, targetName, e.insecure, e.opt.RegistryHosts, e.pushByDigest); err != nil {
return nil, err
}
}

View File

@ -12,7 +12,7 @@ type Exporter interface {
type ExporterInstance interface {
Name() string
Export(context.Context, Source) (map[string]string, error)
Export(ctx context.Context, src Source, sessionID string) (map[string]string, error)
}
type Source struct {

View File

@ -14,7 +14,6 @@ import (
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"golang.org/x/sync/errgroup"
@ -36,33 +35,27 @@ func New(opt Opt) (exporter.Exporter, error) {
}
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
li := &localExporterInstance{localExporter: e, caller: caller}
return li, nil
return &localExporterInstance{localExporter: e}, nil
}
type localExporterInstance struct {
*localExporter
caller session.Caller
}
func (e *localExporterInstance) Name() string {
return "exporting to client"
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}
isMap := len(inp.Refs) > 0
export := func(ctx context.Context, k string, ref cache.ImmutableRef) func() error {
@ -125,7 +118,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source)
}
progress := newProgressHandler(ctx, lbl)
if err := filesync.CopyToCaller(ctx, fs, e.caller, progress); err != nil {
if err := filesync.CopyToCaller(ctx, fs, caller, progress); err != nil {
return err
}
return nil

View File

@ -49,23 +49,9 @@ func New(opt Opt) (exporter.Exporter, error) {
}
func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
var ot *bool
i := &imageExporterInstance{
imageExporter: e,
caller: caller,
layerCompression: blobs.DefaultCompression,
}
for k, v := range opt {
@ -110,7 +96,6 @@ func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exp
type imageExporterInstance struct {
*imageExporter
meta map[string][]byte
caller session.Caller
name string
ociTypes bool
layerCompression blobs.CompressionType
@ -120,7 +105,7 @@ func (e *imageExporterInstance) Name() string {
return "exporting to oci image format"
}
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source) (map[string]string, error) {
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) {
if e.opt.Variant == VariantDocker && len(src.Refs) > 0 {
return nil, errors.Errorf("docker exporter does not currently support exporting manifest lists")
}
@ -175,7 +160,15 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source)
return nil, errors.Errorf("invalid variant %q", e.opt.Variant)
}
w, err := filesync.CopyFileWriter(ctx, resp, e.caller)
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}
w, err := filesync.CopyFileWriter(ctx, resp, caller)
if err != nil {
return nil, err
}

View File

@ -14,7 +14,6 @@ import (
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
)
@ -34,33 +33,19 @@ func New(opt Opt) (exporter.Exporter, error) {
}
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
li := &localExporterInstance{localExporter: e, caller: caller}
li := &localExporterInstance{localExporter: e}
return li, nil
}
type localExporterInstance struct {
*localExporter
caller session.Caller
}
func (e *localExporterInstance) Name() string {
return "exporting to client"
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
var defers []func()
defer func() {
@ -147,7 +132,15 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source)
fs = d.FS
}
w, err := filesync.CopyFileWriter(ctx, nil, e.caller)
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}
w, err := filesync.CopyFileWriter(ctx, nil, caller)
if err != nil {
return nil, err
}

View File

@ -14,11 +14,11 @@ import (
)
type Frontend interface {
Solve(ctx context.Context, llb FrontendLLBBridge, opt map[string]string, inputs map[string]*pb.Definition) (*Result, error)
Solve(ctx context.Context, llb FrontendLLBBridge, opt map[string]string, inputs map[string]*pb.Definition, sid string) (*Result, error)
}
type FrontendLLBBridge interface {
Solve(ctx context.Context, req SolveRequest) (*Result, error)
Solve(ctx context.Context, req SolveRequest, sid string) (*Result, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (digest.Digest, []byte, error)
Exec(ctx context.Context, meta executor.Meta, rootfs cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
}

View File

@ -11,7 +11,6 @@ import (
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/gateway/client"
gwpb "github.com/moby/buildkit/frontend/gateway/pb"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
opspb "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/apicaps"
@ -20,12 +19,12 @@ import (
fstypes "github.com/tonistiigi/fsutil/types"
)
func llbBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, workerInfos []clienttypes.WorkerInfo) (*bridgeClient, error) {
func llbBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, workerInfos []clienttypes.WorkerInfo, sid string) (*bridgeClient, error) {
return &bridgeClient{
opts: opts,
inputs: inputs,
FrontendLLBBridge: llbBridge,
sid: session.FromContext(ctx),
sid: sid,
workerInfos: workerInfos,
final: map[*ref]struct{}{},
}, nil
@ -50,7 +49,7 @@ func (c *bridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*cli
FrontendOpt: req.FrontendOpt,
FrontendInputs: req.FrontendInputs,
CacheImports: req.CacheImports,
})
}, c.sid)
if err != nil {
return nil, err
}

View File

@ -20,8 +20,8 @@ type GatewayForwarder struct {
f client.BuildFunc
}
func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*pb.Definition) (retRes *frontend.Result, retErr error) {
c, err := llbBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers.WorkerInfos())
func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*pb.Definition, sid string) (retRes *frontend.Result, retErr error) {
c, err := llbBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers.WorkerInfos(), sid)
if err != nil {
return nil, err
}

View File

@ -25,7 +25,6 @@ import (
"github.com/moby/buildkit/frontend"
pb "github.com/moby/buildkit/frontend/gateway/pb"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
opspb "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/apicaps"
@ -68,26 +67,24 @@ func filterPrefix(opts map[string]string, pfx string) map[string]string {
return m
}
func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition) (*frontend.Result, error) {
func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, sid string) (*frontend.Result, error) {
source, ok := opts[keySource]
if !ok {
return nil, errors.Errorf("no source specified for gateway")
}
sid := session.FromContext(ctx)
_, isDevel := opts[keyDevel]
var img specs.Image
var rootFS cache.ImmutableRef
var readonly bool // TODO: try to switch to read-only by default.
if isDevel {
devRes, err := llbBridge.Solve(session.NewContext(ctx, "gateway:"+sid),
devRes, err := llbBridge.Solve(ctx,
frontend.SolveRequest{
Frontend: source,
FrontendOpt: filterPrefix(opts, "gateway-"),
FrontendInputs: inputs,
})
}, "gateway:"+sid)
if err != nil {
return nil, err
}
@ -145,7 +142,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
res, err := llbBridge.Solve(ctx, frontend.SolveRequest{
Definition: def.ToPB(),
})
}, sid)
if err != nil {
return nil, err
}
@ -169,7 +166,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
rootFS = workerRef.ImmutableRef
}
lbf, ctx, err := newLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs)
lbf, ctx, err := newLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs, sid)
defer lbf.conn.Close()
if err != nil {
return nil, err
@ -296,7 +293,7 @@ func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) {
return lbf.result, nil
}
func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition) *llbBridgeForwarder {
func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition, sid string) *llbBridgeForwarder {
lbf := &llbBridgeForwarder{
callCtx: ctx,
llbBridge: llbBridge,
@ -305,13 +302,14 @@ func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridg
pipe: newPipe(),
workers: workers,
inputs: inputs,
sid: sid,
}
return lbf
}
func newLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition) (*llbBridgeForwarder, context.Context, error) {
func newLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition, sid string) (*llbBridgeForwarder, context.Context, error) {
ctx, cancel := context.WithCancel(ctx)
lbf := NewBridgeForwarder(ctx, llbBridge, workers, inputs)
lbf := NewBridgeForwarder(ctx, llbBridge, workers, inputs, sid)
server := grpc.NewServer(grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor))
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
pb.RegisterLLBBridgeServer(server, lbf)
@ -403,6 +401,7 @@ type llbBridgeForwarder struct {
workers frontend.WorkerInfos
inputs map[string]*opspb.Definition
isErrServerClosed bool
sid string
*pipe
}
@ -465,7 +464,7 @@ func (lbf *llbBridgeForwarder) Solve(ctx context.Context, req *pb.SolveRequest)
FrontendOpt: req.FrontendOpt,
FrontendInputs: req.FrontendInputs,
CacheImports: cacheImports,
})
}, lbf.sid)
if err != nil {
return nil, err
}

View File

@ -8,19 +8,28 @@ import (
"google.golang.org/grpc/codes"
)
func CredentialsFunc(ctx context.Context, c session.Caller) func(string) (string, string, error) {
func CredentialsFunc(sm *session.Manager, g session.Group) func(string) (string, string, error) {
return func(host string) (string, string, error) {
client := NewAuthClient(c.Conn())
var user, secret string
err := sm.Any(context.TODO(), g, func(ctx context.Context, _ string, c session.Caller) error {
client := NewAuthClient(c.Conn())
resp, err := client.Credentials(ctx, &CredentialsRequest{
Host: host,
resp, err := client.Credentials(ctx, &CredentialsRequest{
Host: host,
})
if err != nil {
if grpcerrors.Code(err) == codes.Unimplemented {
return nil
}
return err
}
user = resp.Username
secret = resp.Secret
return nil
})
if err != nil {
if grpcerrors.Code(err) == codes.Unimplemented {
return "", "", nil
}
return "", "", err
}
return resp.Username, resp.Secret, nil
return user, secret, nil
}
}

View File

@ -1,22 +0,0 @@
package session
import "context"
type contextKeyT string
var contextKey = contextKeyT("buildkit/session-id")
func NewContext(ctx context.Context, id string) context.Context {
if id != "" {
return context.WithValue(ctx, contextKey, id)
}
return ctx
}
func FromContext(ctx context.Context) string {
v := ctx.Value(contextKey)
if v == nil {
return ""
}
return v.(string)
}

88
session/group.go Normal file
View File

@ -0,0 +1,88 @@
package session
import (
"context"
"time"
"github.com/pkg/errors"
)
type Group interface {
SessionIterator() Iterator
}
type Iterator interface {
NextSession() string
}
func NewGroup(ids ...string) Group {
return &group{ids: ids}
}
type group struct {
ids []string
}
func (g *group) SessionIterator() Iterator {
return &group{ids: g.ids}
}
func (g *group) NextSession() string {
if len(g.ids) == 0 {
return ""
}
v := g.ids[0]
g.ids = g.ids[1:]
return v
}
func AllSessionIDs(g Group) (out []string) {
if g == nil {
return nil
}
it := g.SessionIterator()
if it == nil {
return nil
}
for {
v := it.NextSession()
if v == "" {
return
}
out = append(out, v)
}
}
func (sm *Manager) Any(ctx context.Context, g Group, f func(context.Context, string, Caller) error) error {
if g == nil {
return nil
}
iter := g.SessionIterator()
if iter == nil {
return nil
}
var lastErr error
for {
id := iter.NextSession()
if id == "" {
if lastErr != nil {
return lastErr
}
return errors.Errorf("no active sessions")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
c, err := sm.Get(timeoutCtx, id)
if err != nil {
lastErr = err
continue
}
if err := f(ctx, id, c); err != nil {
lastErr = err
continue
}
return nil
}
}

View File

@ -23,7 +23,7 @@ type ResolveOpFunc func(Vertex, Builder) (Op, error)
type Builder interface {
Build(ctx context.Context, e Edge) (CachedResult, error)
Context(ctx context.Context) context.Context
InContext(ctx context.Context, f func(ctx context.Context, g session.Group) error) error
EachValue(ctx context.Context, key string, fn func(interface{}) error) error
}
@ -67,32 +67,69 @@ type state struct {
solver *Solver
}
func (s *state) getSessionID() string {
// TODO: connect with sessionmanager to avoid getting dropped sessions
s.mu.Lock()
for j := range s.jobs {
if j.SessionID != "" {
s.mu.Unlock()
return j.SessionID
}
}
parents := map[digest.Digest]struct{}{}
for p := range s.parents {
parents[p] = struct{}{}
}
s.mu.Unlock()
func (s *state) SessionIterator() session.Iterator {
return s.sessionIterator()
}
for p := range parents {
s.solver.mu.Lock()
pst, ok := s.solver.actives[p]
s.solver.mu.Unlock()
if ok {
if sessionID := pst.getSessionID(); sessionID != "" {
return sessionID
func (s *state) sessionIterator() *sessionGroup {
return &sessionGroup{state: s, visited: map[string]struct{}{}}
}
type sessionGroup struct {
*state
visited map[string]struct{}
parents []session.Iterator
mode int
}
func (g *sessionGroup) NextSession() string {
if g.mode == 0 {
g.mu.Lock()
for j := range g.jobs {
if j.SessionID != "" {
if _, ok := g.visited[j.SessionID]; ok {
continue
}
g.visited[j.SessionID] = struct{}{}
g.mu.Unlock()
return j.SessionID
}
}
g.mu.Unlock()
g.mode = 1
}
if g.mode == 1 {
parents := map[digest.Digest]struct{}{}
g.mu.Lock()
for p := range g.state.parents {
parents[p] = struct{}{}
}
g.mu.Unlock()
for p := range parents {
g.solver.mu.Lock()
pst, ok := g.solver.actives[p]
g.solver.mu.Unlock()
if ok {
gg := pst.sessionIterator()
gg.visited = g.visited
g.parents = append(g.parents, gg)
}
}
g.mode = 2
}
for {
if len(g.parents) == 0 {
return ""
}
p := g.parents[0]
id := p.NextSession()
if id != "" {
return id
}
g.parents = g.parents[1:]
}
return ""
}
func (s *state) builder() *subBuilder {
@ -172,9 +209,8 @@ func (sb *subBuilder) Build(ctx context.Context, e Edge) (CachedResult, error) {
return res, nil
}
func (sb *subBuilder) Context(ctx context.Context) context.Context {
ctx = session.NewContext(ctx, sb.state.getSessionID())
return opentracing.ContextWithSpan(progress.WithProgress(ctx, sb.mpw), sb.mspan)
func (sb *subBuilder) InContext(ctx context.Context, f func(context.Context, session.Group) error) error {
return f(opentracing.ContextWithSpan(progress.WithProgress(ctx, sb.mpw), sb.mspan), sb.state)
}
func (sb *subBuilder) EachValue(ctx context.Context, key string, fn func(interface{}) error) error {
@ -480,9 +516,8 @@ func (j *Job) Discard() error {
return nil
}
func (j *Job) Context(ctx context.Context) context.Context {
ctx = session.NewContext(ctx, j.SessionID)
return progress.WithProgress(ctx, j.pw)
func (j *Job) InContext(ctx context.Context, f func(context.Context, session.Group) error) error {
return f(progress.WithProgress(ctx, j.pw), session.NewGroup(j.SessionID))
}
func (j *Job) SetValue(key string, v interface{}) {
@ -631,7 +666,6 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
return nil, s.cacheErr
}
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
ctx = session.NewContext(ctx, s.st.getSessionID())
if len(s.st.vtx.Inputs()) == 0 {
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name())
@ -641,7 +675,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
notifyCompleted(ctx, &s.st.clientVertex, retErr, false)
}()
}
res, done, err := op.CacheMap(ctx, len(s.cacheRes))
res, done, err := op.CacheMap(ctx, s.st, len(s.cacheRes))
complete := true
if err != nil {
select {
@ -687,7 +721,6 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
}
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
ctx = session.NewContext(ctx, s.st.getSessionID())
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name())
@ -697,7 +730,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
notifyCompleted(ctx, &s.st.clientVertex, retErr, false)
}()
res, err := op.Exec(ctx, inputs)
res, err := op.Exec(ctx, s.st, inputs)
complete := true
if err != nil {
select {

View File

@ -60,12 +60,12 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp
func(cmID string, im gw.CacheOptionsEntry) {
cm = newLazyCacheManager(cmID, func() (solver.CacheManager, error) {
var cmNew solver.CacheManager
if err := inVertexContext(b.builder.Context(context.TODO()), "importing cache manifest from "+cmID, "", func(ctx context.Context) error {
if err := inBuilderContext(context.TODO(), b.builder, "importing cache manifest from "+cmID, "", func(ctx context.Context, g session.Group) error {
resolveCI, ok := b.resolveCacheImporterFuncs[im.Type]
if !ok {
return errors.Errorf("unknown cache importer: %s", im.Type)
}
ci, desc, err := resolveCI(ctx, im.Attrs)
ci, desc, err := resolveCI(ctx, g, im.Attrs)
if err != nil {
return err
}
@ -120,7 +120,7 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp
return res, err
}
func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (res *frontend.Result, err error) {
func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest, sid string) (res *frontend.Result, err error) {
if req.Definition != nil && req.Definition.Def != nil && req.Frontend != "" {
return nil, errors.New("cannot solve with both Definition and Frontend specified")
}
@ -132,7 +132,7 @@ func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (res *
if !ok {
return nil, errors.Errorf("invalid frontend: %s", req.Frontend)
}
res, err = f.Solve(ctx, b, req.FrontendOpt, req.FrontendInputs)
res, err = f.Solve(ctx, b, req.FrontendOpt, req.FrontendInputs, sid)
if err != nil {
return nil, errors.Wrapf(err, "failed to solve with frontend %s", req.Frontend)
}
@ -271,8 +271,8 @@ func (s *llbBridge) ResolveImageConfig(ctx context.Context, ref string, opt llb.
} else {
id += platforms.Format(*platform)
}
err = inVertexContext(s.builder.Context(ctx), opt.LogName, id, func(ctx context.Context) error {
dgst, config, err = w.ResolveImageConfig(ctx, ref, opt, s.sm)
err = inBuilderContext(ctx, s.builder, opt.LogName, id, func(ctx context.Context, g session.Group) error {
dgst, config, err = w.ResolveImageConfig(ctx, ref, opt, s.sm, g)
return err
})
return dgst, config, err

View File

@ -8,6 +8,7 @@ import (
"github.com/containerd/continuity/fs"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
@ -36,7 +37,7 @@ func NewBuildOp(v solver.Vertex, op *pb.Op_Build, b frontend.FrontendLLBBridge,
}, nil
}
func (b *buildOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
func (b *buildOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
dt, err := json.Marshal(struct {
Type string
Exec *pb.BuildOp
@ -57,7 +58,7 @@ func (b *buildOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bo
}, true, nil
}
func (b *buildOp) Exec(ctx context.Context, inputs []solver.Result) (outputs []solver.Result, retErr error) {
func (b *buildOp) Exec(ctx context.Context, g session.Group, inputs []solver.Result) (outputs []solver.Result, retErr error) {
if b.op.Builder != pb.LLBBuilder {
return nil, errors.Errorf("only LLB builder is currently allowed")
}
@ -123,7 +124,7 @@ func (b *buildOp) Exec(ctx context.Context, inputs []solver.Result) (outputs []s
newRes, err := b.b.Solve(ctx, frontend.SolveRequest{
Definition: def.ToPB(),
})
}, g.SessionIterator().NextSession())
if err != nil {
return nil, err
}

View File

@ -94,7 +94,7 @@ func cloneExecOp(old *pb.ExecOp) pb.ExecOp {
return n
}
func (e *execOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
func (e *execOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
op := cloneExecOp(e.op)
for i := range op.Meta.ExtraHosts {
h := op.Meta.ExtraHosts[i]
@ -331,30 +331,26 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string,
return mRef, nil
}
func (e *execOp) getSSHMountable(ctx context.Context, m *pb.Mount) (cache.Mountable, error) {
sessionID := session.FromContext(ctx)
if sessionID == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.sm.Get(timeoutCtx, sessionID)
func (e *execOp) getSSHMountable(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
var caller session.Caller
err := e.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error {
if err := sshforward.CheckSSHID(ctx, c, m.SSHOpt.ID); err != nil {
if m.SSHOpt.Optional {
return nil
}
if grpcerrors.Code(err) == codes.Unimplemented {
return errors.Errorf("no SSH key %q forwarded from the client", m.SSHOpt.ID)
}
return err
}
caller = c
return nil
})
if err != nil {
return nil, err
}
if err := sshforward.CheckSSHID(ctx, caller, m.SSHOpt.ID); err != nil {
if m.SSHOpt.Optional {
return nil, nil
}
if grpcerrors.Code(err) == codes.Unimplemented {
return nil, errors.Errorf("no SSH key %q forwarded from the client", m.SSHOpt.ID)
}
return nil, err
}
// because ssh socket remains active, to actually handle session disconnecting ssh error
// should restart the whole exec with new session
return &sshMount{mount: m, caller: caller, idmap: e.cm.IdentityMapping()}, nil
}
@ -421,7 +417,7 @@ func (sm *sshMountInstance) IdentityMapping() *idtools.IdentityMapping {
return sm.idmap
}
func (e *execOp) getSecretMountable(ctx context.Context, m *pb.Mount) (cache.Mountable, error) {
func (e *execOp) getSecretMountable(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
if m.SecretOpt == nil {
return nil, errors.Errorf("invalid sercet mount options")
}
@ -431,28 +427,21 @@ func (e *execOp) getSecretMountable(ctx context.Context, m *pb.Mount) (cache.Mou
if id == "" {
return nil, errors.Errorf("secret ID missing from mount options")
}
sessionID := session.FromContext(ctx)
if sessionID == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.sm.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}
dt, err := secrets.GetSecret(ctx, caller, id)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) && m.SecretOpt.Optional {
return nil, nil
var dt []byte
var err error
err = e.sm.Any(ctx, g, func(ctx context.Context, _ string, caller session.Caller) error {
dt, err = secrets.GetSecret(ctx, caller, id)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) && m.SecretOpt.Optional {
return nil
}
return err
}
return nil
})
if err != nil || dt == nil {
return nil, err
}
return &secretMount{mount: m, data: dt, idmap: e.cm.IdentityMapping()}, nil
}
@ -562,7 +551,7 @@ func addDefaultEnvvar(env []string, k, v string) []string {
return append(env, k+"="+v)
}
func (e *execOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Result, error) {
func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Result) ([]solver.Result, error) {
var mounts []executor.Mount
var root cache.Mountable
var readonlyRootFS bool
@ -651,7 +640,7 @@ func (e *execOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Res
mountable = newTmpfs(e.cm.IdentityMapping())
case pb.MountType_SECRET:
secretMount, err := e.getSecretMountable(ctx, m)
secretMount, err := e.getSecretMountable(ctx, m, g)
if err != nil {
return nil, err
}
@ -661,7 +650,7 @@ func (e *execOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Res
mountable = secretMount
case pb.MountType_SSH:
sshMount, err := e.getSSHMountable(ctx, m)
sshMount, err := e.getSSHMountable(ctx, m, g)
if err != nil {
return nil, err
}

View File

@ -12,6 +12,7 @@ import (
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
"github.com/moby/buildkit/solver/llbsolver/file"
@ -47,7 +48,7 @@ func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, md *metadata.S
}, nil
}
func (f *fileOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
func (f *fileOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
selectors := map[int]map[llbsolver.Selector]struct{}{}
invalidSelectors := map[int]struct{}{}
@ -141,7 +142,7 @@ func (f *fileOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, boo
return cm, true, nil
}
func (f *fileOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Result, error) {
func (f *fileOp) Exec(ctx context.Context, g session.Group, inputs []solver.Result) ([]solver.Result, error) {
inpRefs := make([]fileoptypes.Ref, 0, len(inputs))
for _, inp := range inputs {
workerRef, ok := inp.Sys().(*worker.WorkerRef)

View File

@ -57,12 +57,12 @@ func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error)
return s.src, nil
}
func (s *sourceOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
func (s *sourceOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
src, err := s.instance(ctx)
if err != nil {
return nil, false, err
}
k, done, err := src.CacheKey(ctx, index)
k, done, err := src.CacheKey(ctx, g, index)
if err != nil {
return nil, false, err
}
@ -79,12 +79,12 @@ func (s *sourceOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, b
}, done, nil
}
func (s *sourceOp) Exec(ctx context.Context, _ []solver.Result) (outputs []solver.Result, err error) {
func (s *sourceOp) Exec(ctx context.Context, g session.Group, _ []solver.Result) (outputs []solver.Result, err error) {
src, err := s.instance(ctx)
if err != nil {
return nil, err
}
ref, err := src.Snapshot(ctx)
ref, err := src.Snapshot(ctx, g)
if err != nil {
return nil, err
}

View File

@ -88,7 +88,7 @@ func (s *Solver) Bridge(b solver.Builder) frontend.FrontendLLBBridge {
}
}
func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest, exp ExporterRequest, ent []entitlements.Entitlement) (*client.SolveResponse, error) {
func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req frontend.SolveRequest, exp ExporterRequest, ent []entitlements.Entitlement) (*client.SolveResponse, error) {
j, err := s.solver.NewJob(id)
if err != nil {
return nil, err
@ -102,11 +102,11 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest
}
j.SetValue(keyEntitlements, set)
j.SessionID = session.FromContext(ctx)
j.SessionID = sessionID
var res *frontend.Result
if s.gatewayForwarder != nil && req.Definition == nil && req.Frontend == "" {
fwd := gateway.NewBridgeForwarder(ctx, s.Bridge(j), s.workerController, req.FrontendInputs)
fwd := gateway.NewBridgeForwarder(ctx, s.Bridge(j), s.workerController, req.FrontendInputs, sessionID)
defer fwd.Discard()
if err := s.gatewayForwarder.RegisterBuild(ctx, id, fwd); err != nil {
return nil, err
@ -124,7 +124,7 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest
return nil, err
}
} else {
res, err = s.Bridge(j).Solve(ctx, req)
res, err = s.Bridge(j).Solve(ctx, req, sessionID)
if err != nil {
return nil, err
}
@ -204,8 +204,8 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest
inp.Refs = m
}
if err := inVertexContext(j.Context(ctx), e.Name(), "", func(ctx context.Context) error {
exporterResponse, err = e.Export(ctx, inp)
if err := inBuilderContext(ctx, j, e.Name(), "", func(ctx context.Context, _ session.Group) error {
exporterResponse, err = e.Export(ctx, inp, j.SessionID)
return err
}); err != nil {
return nil, err
@ -214,7 +214,7 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest
var cacheExporterResponse map[string]string
if e := exp.CacheExporter; e != nil {
if err := inVertexContext(j.Context(ctx), "exporting cache", "", func(ctx context.Context) error {
if err := inBuilderContext(ctx, j, "exporting cache", "", func(ctx context.Context, _ session.Group) error {
prepareDone := oneOffProgress(ctx, "preparing build cache for export")
if err := res.EachRef(func(res solver.ResultProxy) error {
r, err := res.Result(ctx)
@ -335,7 +335,7 @@ func oneOffProgress(ctx context.Context, id string) func(err error) error {
}
}
func inVertexContext(ctx context.Context, name, id string, f func(ctx context.Context) error) error {
func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f func(ctx context.Context, g session.Group) error) error {
if id == "" {
id = name
}
@ -343,12 +343,14 @@ func inVertexContext(ctx context.Context, name, id string, f func(ctx context.Co
Digest: digest.FromBytes([]byte(id)),
Name: name,
}
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
notifyStarted(ctx, &v, false)
defer pw.Close()
err := f(ctx)
notifyCompleted(ctx, &v, err, false)
return err
return b.InContext(ctx, func(ctx context.Context, g session.Group) error {
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
notifyStarted(ctx, &v, false)
defer pw.Close()
err := f(ctx, g)
notifyCompleted(ctx, &v, err, false)
return err
})
}
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) {

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
@ -3466,7 +3467,7 @@ func (v *vertex) cacheMap(ctx context.Context) error {
return nil
}
func (v *vertex) CacheMap(ctx context.Context, index int) (*CacheMap, bool, error) {
func (v *vertex) CacheMap(ctx context.Context, g session.Group, index int) (*CacheMap, bool, error) {
if index == 0 {
if err := v.cacheMap(ctx); err != nil {
return nil, false, err
@ -3503,7 +3504,7 @@ func (v *vertex) exec(ctx context.Context, inputs []Result) error {
return nil
}
func (v *vertex) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
func (v *vertex) Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error) {
if err := v.exec(ctx, inputs); err != nil {
return nil, err
}
@ -3547,7 +3548,7 @@ func (v *vertexConst) Sys() interface{} {
return v
}
func (v *vertexConst) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
func (v *vertexConst) Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error) {
if err := v.exec(ctx, inputs); err != nil {
return nil, err
}
@ -3574,7 +3575,7 @@ func (v *vertexSum) Sys() interface{} {
return v
}
func (v *vertexSum) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
func (v *vertexSum) Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error) {
if err := v.exec(ctx, inputs); err != nil {
return nil, err
}
@ -3609,7 +3610,7 @@ func (v *vertexSubBuild) Sys() interface{} {
return v
}
func (v *vertexSubBuild) Exec(ctx context.Context, inputs []Result) (outputs []Result, err error) {
func (v *vertexSubBuild) Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error) {
if err := v.exec(ctx, inputs); err != nil {
return nil, err
}

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/containerd/containerd/content"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/pb"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -138,10 +139,10 @@ type CacheLink struct {
type Op interface {
// CacheMap returns structure describing how the operation is cached.
// Currently only roots are allowed to return multiple cache maps per op.
CacheMap(context.Context, int) (*CacheMap, bool, error)
CacheMap(context.Context, session.Group, int) (*CacheMap, bool, error)
// Exec runs an operation given results from previous operations.
Exec(ctx context.Context, inputs []Result) (outputs []Result, err error)
Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error)
}
type ResultBasedCacheFunc func(context.Context, Result) (digest.Digest, error)

View File

@ -23,6 +23,7 @@ import (
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/pull"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/winlayers"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
@ -43,24 +44,26 @@ type SourceOpt struct {
LeaseManager leases.Manager
}
type imageSource struct {
type Source struct {
SourceOpt
g flightcontrol.Group
}
func NewSource(opt SourceOpt) (source.Source, error) {
is := &imageSource{
var _ source.Source = &Source{}
func NewSource(opt SourceOpt) (*Source, error) {
is := &Source{
SourceOpt: opt,
}
return is, nil
}
func (is *imageSource) ID() string {
func (is *Source) ID() string {
return source.DockerImageScheme
}
func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) {
func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) {
type t struct {
dgst digest.Digest
dt []byte
@ -76,7 +79,13 @@ func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt l
}
res, err := is.g.Do(ctx, key, func(ctx context.Context) (interface{}, error) {
dgst, dt, err := imageutil.Config(ctx, ref, pull.NewResolver(ctx, is.RegistryHosts, sm, is.ImageStore, rm, ref), is.ContentStore, is.LeaseManager, opt.Platform)
dgst, dt, err := imageutil.Config(ctx, ref, pull.NewResolver(g, pull.ResolverOpt{
Hosts: is.RegistryHosts,
Auth: resolver.NewSessionAuthenticator(sm, g),
ImageStore: is.ImageStore,
Mode: rm,
Ref: ref,
}), is.ContentStore, is.LeaseManager, opt.Platform)
if err != nil {
return nil, err
}
@ -89,7 +98,7 @@ func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt l
return typed.dgst, typed.dt, nil
}
func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
imageIdentifier, ok := id.(*source.ImageIdentifier)
if !ok {
return nil, errors.Errorf("invalid image identifier %v", id)
@ -105,7 +114,6 @@ func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *se
ContentStore: is.ContentStore,
Applier: is.Applier,
Src: imageIdentifier.Reference,
Resolver: pull.NewResolver(ctx, is.RegistryHosts, sm, is.ImageStore, imageIdentifier.ResolveMode, imageIdentifier.Reference.String()),
Platform: &platform,
}
p := &puller{
@ -114,6 +122,13 @@ func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *se
Platform: platform,
id: imageIdentifier,
LeaseManager: is.LeaseManager,
ResolverOpt: pull.ResolverOpt{
Hosts: is.RegistryHosts,
Auth: resolver.NewSessionAuthenticator(sm, nil),
ImageStore: is.ImageStore,
Mode: imageIdentifier.ResolveMode,
Ref: imageIdentifier.Reference.String(),
},
}
return p, nil
}
@ -122,6 +137,7 @@ type puller struct {
CacheAccessor cache.Accessor
LeaseManager leases.Manager
Platform specs.Platform
ResolverOpt pull.ResolverOpt
id *source.ImageIdentifier
*pull.Puller
}
@ -144,7 +160,12 @@ func mainManifestKey(ctx context.Context, desc specs.Descriptor, platform specs.
return digest.FromBytes(dt), nil
}
func (p *puller) CacheKey(ctx context.Context, index int) (string, bool, error) {
func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
if p.Puller.Resolver == nil {
p.Puller.Resolver = pull.NewResolver(g, p.ResolverOpt)
} else {
p.ResolverOpt.Auth.AddSession(g)
}
_, desc, err := p.Puller.Resolve(ctx)
if err != nil {
return "", false, err
@ -180,7 +201,13 @@ func (p *puller) CacheKey(ctx context.Context, index int) (string, bool, error)
return k, true, nil
}
func (p *puller) Snapshot(ctx context.Context) (ir cache.ImmutableRef, err error) {
func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.ImmutableRef, err error) {
if p.Puller.Resolver == nil {
p.Puller.Resolver = pull.NewResolver(g, p.ResolverOpt)
} else {
p.ResolverOpt.Auth.AddSession(g)
}
layerNeedsTypeWindows := false
if platform := p.Puller.Platform; platform != nil {
if platform.OS == "windows" && runtime.GOOS != "windows" {

View File

@ -12,7 +12,6 @@ import (
"path/filepath"
"regexp"
"strings"
"time"
"github.com/docker/docker/pkg/locker"
"github.com/moby/buildkit/cache"
@ -205,7 +204,7 @@ func (gs *gitSourceHandler) authSecretNames() (sec []authSecret, _ error) {
return sec, nil
}
func (gs *gitSourceHandler) getAuthToken(ctx context.Context) error {
func (gs *gitSourceHandler) getAuthToken(ctx context.Context, g session.Group) error {
if gs.auth != nil {
return nil
}
@ -213,38 +212,26 @@ func (gs *gitSourceHandler) getAuthToken(ctx context.Context) error {
if err != nil {
return err
}
id := session.FromContext(ctx)
if id == "" {
return errors.New("could not access auth tokens without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := gs.sm.Get(timeoutCtx, id)
if err != nil {
return err
}
for _, s := range sec {
dt, err := secrets.GetSecret(ctx, caller, s.name)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) {
continue
return gs.sm.Any(ctx, g, func(ctx context.Context, _ string, caller session.Caller) error {
for _, s := range sec {
dt, err := secrets.GetSecret(ctx, caller, s.name)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) {
continue
}
return err
}
return err
if s.token {
dt = []byte("basic " + base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("x-access-token:%s", dt))))
}
gs.auth = []string{"-c", "http.extraheader=Authorization: " + string(dt)}
break
}
if s.token {
dt = []byte("basic " + base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("x-access-token:%s", dt))))
}
gs.auth = []string{"-c", "http.extraheader=Authorization: " + string(dt)}
break
}
return nil
return nil
})
}
func (gs *gitSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
func (gs *gitSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
remote := gs.src.Remote
ref := gs.src.Ref
if ref == "" {
@ -259,7 +246,7 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context, index int) (string, bo
return ref, true, nil
}
gs.getAuthToken(ctx)
gs.getAuthToken(ctx, g)
gitDir, unmountGitDir, err := gs.mountRemote(ctx, remote, gs.auth)
if err != nil {
@ -288,7 +275,7 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context, index int) (string, bo
return sha, true, nil
}
func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRef, retErr error) {
func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out cache.ImmutableRef, retErr error) {
ref := gs.src.Ref
if ref == "" {
ref = "master"
@ -297,13 +284,13 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
cacheKey := gs.cacheKey
if cacheKey == "" {
var err error
cacheKey, _, err = gs.CacheKey(ctx, 0)
cacheKey, _, err = gs.CacheKey(ctx, g, 0)
if err != nil {
return nil, err
}
}
gs.getAuthToken(ctx)
gs.getAuthToken(ctx, g)
snapshotKey := "git-snapshot::" + cacheKey + ":" + gs.src.Subdir
gs.locker.Lock(snapshotKey)

View File

@ -55,7 +55,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
g, err := gs.Resolve(ctx, id, nil)
require.NoError(t, err)
key1, done, err := g.CacheKey(ctx, 0)
key1, done, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.True(t, done)
@ -66,7 +66,7 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
require.Equal(t, expLen, len(key1))
ref1, err := g.Snapshot(ctx)
ref1, err := g.Snapshot(ctx, nil)
require.NoError(t, err)
defer ref1.Release(context.TODO())
@ -97,12 +97,12 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
g, err = gs.Resolve(ctx, id, nil)
require.NoError(t, err)
key2, _, err := g.CacheKey(ctx, 0)
key2, _, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, key1, key2)
ref2, err := g.Snapshot(ctx)
ref2, err := g.Snapshot(ctx, nil)
require.NoError(t, err)
defer ref2.Release(context.TODO())
@ -113,11 +113,11 @@ func testRepeatedFetch(t *testing.T, keepGitDir bool) {
g, err = gs.Resolve(ctx, id, nil)
require.NoError(t, err)
key3, _, err := g.CacheKey(ctx, 0)
key3, _, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.NotEqual(t, key1, key3)
ref3, err := g.Snapshot(ctx)
ref3, err := g.Snapshot(ctx, nil)
require.NoError(t, err)
defer ref3.Release(context.TODO())
@ -178,7 +178,7 @@ func testFetchBySHA(t *testing.T, keepGitDir bool) {
g, err := gs.Resolve(ctx, id, nil)
require.NoError(t, err)
key1, done, err := g.CacheKey(ctx, 0)
key1, done, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.True(t, done)
@ -189,7 +189,7 @@ func testFetchBySHA(t *testing.T, keepGitDir bool) {
require.Equal(t, expLen, len(key1))
ref1, err := g.Snapshot(ctx)
ref1, err := g.Snapshot(ctx, nil)
require.NoError(t, err)
defer ref1.Release(context.TODO())
@ -265,17 +265,17 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) {
expLen += 4
}
key1, _, err := g.CacheKey(ctx, 0)
key1, _, err := g.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expLen, len(key1))
key2, _, err := g2.CacheKey(ctx, 0)
key2, _, err := g2.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expLen, len(key2))
require.NotEqual(t, key1, key2)
ref1, err := g.Snapshot(ctx)
ref1, err := g.Snapshot(ctx, nil)
require.NoError(t, err)
defer ref1.Release(context.TODO())
@ -287,7 +287,7 @@ func testMultipleRepos(t *testing.T, keepGitDir bool) {
require.NoError(t, err)
defer lm.Unmount()
ref2, err := g2.Snapshot(ctx)
ref2, err := g2.Snapshot(ctx, nil)
require.NoError(t, err)
defer ref2.Release(context.TODO())

View File

@ -64,7 +64,7 @@ type httpSourceHandler struct {
src source.HttpIdentifier
refID string
cacheKey digest.Digest
client *http.Client
sm *session.Manager
}
func (hs *httpSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
@ -73,15 +73,17 @@ func (hs *httpSource) Resolve(ctx context.Context, id source.Identifier, sm *ses
return nil, errors.Errorf("invalid http identifier %v", id)
}
sessionID := session.FromContext(ctx)
return &httpSourceHandler{
src: *httpIdentifier,
httpSource: hs,
client: &http.Client{Transport: newTransport(hs.transport, sm, sessionID)},
sm: sm,
}, nil
}
func (hs *httpSourceHandler) client(g session.Group) *http.Client {
return &http.Client{Transport: newTransport(hs.transport, hs.sm, g)}
}
// urlHash is internal hash the etag is stored by that doesn't leak outside
// this package.
func (hs *httpSourceHandler) urlHash() (digest.Digest, error) {
@ -120,7 +122,7 @@ func (hs *httpSourceHandler) formatCacheKey(filename string, dgst digest.Digest,
return digest.FromBytes(dt)
}
func (hs *httpSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
if hs.src.Checksum != "" {
hs.cacheKey = hs.src.Checksum
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, nil), hs.src.Checksum, "").String(), true, nil
@ -172,12 +174,14 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, index int) (string, b
}
}
client := hs.client(g)
// Some servers seem to have trouble supporting If-None-Match properly even
// though they return ETag-s. So first, optionally try a HEAD request with
// manual ETag value comparison.
if len(m) > 0 {
req.Method = "HEAD"
resp, err := hs.client.Do(req)
resp, err := client.Do(req)
if err == nil {
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotModified {
respETag := resp.Header.Get("ETag")
@ -203,7 +207,7 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, index int) (string, b
req.Method = "GET"
}
resp, err := hs.client.Do(req)
resp, err := client.Do(req)
if err != nil {
return "", false, err
}
@ -366,7 +370,7 @@ func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response) (ref
return ref, dgst, nil
}
func (hs *httpSourceHandler) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
func (hs *httpSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
if hs.refID != "" {
ref, err := hs.cache.Get(ctx, hs.refID)
if err == nil {
@ -380,7 +384,9 @@ func (hs *httpSourceHandler) Snapshot(ctx context.Context) (cache.ImmutableRef,
}
req = req.WithContext(ctx)
resp, err := hs.client.Do(req)
client := hs.client(g)
resp, err := client.Do(req)
if err != nil {
return nil, err
}

View File

@ -49,7 +49,7 @@ func TestHTTPSource(t *testing.T) {
h, err := hs.Resolve(ctx, id, nil)
require.NoError(t, err)
k, _, err := h.CacheKey(ctx, 0)
k, _, err := h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
expectedContent1 := "sha256:0b1a154faa3003c1fbe7fda9c8a42d55fde2df2a2c405c32038f8ac7ed6b044a"
@ -58,7 +58,7 @@ func TestHTTPSource(t *testing.T) {
require.Equal(t, server.Stats("/foo").AllRequests, 1)
require.Equal(t, server.Stats("/foo").CachedRequests, 0)
ref, err := h.Snapshot(ctx)
ref, err := h.Snapshot(ctx, nil)
require.NoError(t, err)
defer func() {
if ref != nil {
@ -78,14 +78,14 @@ func TestHTTPSource(t *testing.T) {
h, err = hs.Resolve(ctx, id, nil)
require.NoError(t, err)
k, _, err = h.CacheKey(ctx, 0)
k, _, err = h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expectedContent1, k)
require.Equal(t, server.Stats("/foo").AllRequests, 2)
require.Equal(t, server.Stats("/foo").CachedRequests, 1)
ref, err = h.Snapshot(ctx)
ref, err = h.Snapshot(ctx, nil)
require.NoError(t, err)
defer func() {
if ref != nil {
@ -114,14 +114,14 @@ func TestHTTPSource(t *testing.T) {
h, err = hs.Resolve(ctx, id, nil)
require.NoError(t, err)
k, _, err = h.CacheKey(ctx, 0)
k, _, err = h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expectedContent2, k)
require.Equal(t, server.Stats("/foo").AllRequests, 4)
require.Equal(t, server.Stats("/foo").CachedRequests, 1)
ref, err = h.Snapshot(ctx)
ref, err = h.Snapshot(ctx, nil)
require.NoError(t, err)
defer func() {
if ref != nil {
@ -163,14 +163,14 @@ func TestHTTPDefaultName(t *testing.T) {
h, err := hs.Resolve(ctx, id, nil)
require.NoError(t, err)
k, _, err := h.CacheKey(ctx, 0)
k, _, err := h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, "sha256:146f16ec8810a62a57ce314aba391f95f7eaaf41b8b1ebaf2ab65fd63b1ad437", k)
require.Equal(t, server.Stats("/").AllRequests, 1)
require.Equal(t, server.Stats("/").CachedRequests, 0)
ref, err := h.Snapshot(ctx)
ref, err := h.Snapshot(ctx, nil)
require.NoError(t, err)
defer func() {
if ref != nil {
@ -206,7 +206,7 @@ func TestHTTPInvalidURL(t *testing.T) {
h, err := hs.Resolve(ctx, id, nil)
require.NoError(t, err)
_, _, err = h.CacheKey(ctx, 0)
_, _, err = h.CacheKey(ctx, nil, 0)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid response")
}
@ -236,7 +236,7 @@ func TestHTTPChecksum(t *testing.T) {
h, err := hs.Resolve(ctx, id, nil)
require.NoError(t, err)
k, _, err := h.CacheKey(ctx, 0)
k, _, err := h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
expectedContentDifferent := "sha256:f25996f463dca69cffb580f8273ffacdda43332b5f0a8bea2ead33900616d44b"
@ -246,7 +246,7 @@ func TestHTTPChecksum(t *testing.T) {
require.Equal(t, server.Stats("/foo").AllRequests, 0)
require.Equal(t, server.Stats("/foo").CachedRequests, 0)
_, err = h.Snapshot(ctx)
_, err = h.Snapshot(ctx, nil)
require.Error(t, err)
require.Equal(t, expectedContentDifferent, k)
@ -258,14 +258,14 @@ func TestHTTPChecksum(t *testing.T) {
h, err = hs.Resolve(ctx, id, nil)
require.NoError(t, err)
k, _, err = h.CacheKey(ctx, 0)
k, _, err = h.CacheKey(ctx, nil, 0)
require.NoError(t, err)
require.Equal(t, expectedContentCorrect, k)
require.Equal(t, server.Stats("/foo").AllRequests, 1)
require.Equal(t, server.Stats("/foo").CachedRequests, 0)
ref, err := h.Snapshot(ctx)
ref, err := h.Snapshot(ctx, nil)
require.NoError(t, err)
defer func() {
if ref != nil {

View File

@ -4,21 +4,20 @@ import (
"context"
"io"
"net/http"
"time"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/upload"
"github.com/pkg/errors"
)
func newTransport(rt http.RoundTripper, sm *session.Manager, id string) http.RoundTripper {
return &sessionHandler{rt: rt, sm: sm, id: id}
func newTransport(rt http.RoundTripper, sm *session.Manager, g session.Group) http.RoundTripper {
return &sessionHandler{rt: rt, sm: sm, g: g}
}
type sessionHandler struct {
sm *session.Manager
rt http.RoundTripper
id string
g session.Group
}
func (h *sessionHandler) RoundTrip(req *http.Request) (*http.Response, error) {
@ -30,31 +29,30 @@ func (h *sessionHandler) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, errors.Errorf("invalid request")
}
timeoutCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
var resp *http.Response
err := h.sm.Any(context.TODO(), h.g, func(ctx context.Context, _ string, caller session.Caller) error {
up, err := upload.New(context.TODO(), caller, req.URL)
if err != nil {
return err
}
caller, err := h.sm.Get(timeoutCtx, h.id)
pr, pw := io.Pipe()
go func() {
_, err := up.WriteTo(pw)
pw.CloseWithError(err)
}()
resp = &http.Response{
Status: "200 OK",
StatusCode: 200,
Body: pr,
ContentLength: -1,
}
return nil
})
if err != nil {
return nil, err
}
up, err := upload.New(context.TODO(), caller, req.URL)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
go func() {
_, err := up.WriteTo(pw)
pw.CloseWithError(err)
}()
resp := &http.Response{
Status: "200 OK",
StatusCode: 200,
Body: pr,
ContentLength: -1,
}
return resp, nil
}

View File

@ -70,11 +70,11 @@ type localSourceHandler struct {
*localSource
}
func (ls *localSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
sessionID := ls.src.SessionID
if sessionID == "" {
id := session.FromContext(ctx)
id := g.SessionIterator().NextSession()
if id == "" {
return "", false, errors.New("could not access local files without session")
}
@ -92,21 +92,23 @@ func (ls *localSourceHandler) CacheKey(ctx context.Context, index int) (string,
return "session:" + ls.src.Name + ":" + digest.FromBytes(dt).String(), true, nil
}
func (ls *localSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRef, retErr error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := ls.sm.Get(timeoutCtx, id)
func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
var ref cache.ImmutableRef
err := ls.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error {
r, err := ls.snapshot(ctx, c)
if err != nil {
return err
}
ref = r
return nil
})
if err != nil {
return nil, err
}
return ref, nil
}
func (ls *localSourceHandler) snapshot(ctx context.Context, caller session.Caller) (out cache.ImmutableRef, retErr error) {
sharedKey := keySharedKey + ":" + ls.src.Name + ":" + ls.src.SharedKeyHint + ":" + caller.SharedKey() // TODO: replace caller.SharedKey() with source based hint from client(absolute-path+nodeid)
var mutable cache.MutableRef

View File

@ -15,8 +15,8 @@ type Source interface {
}
type SourceInstance interface {
CacheKey(ctx context.Context, index int) (string, bool, error)
Snapshot(ctx context.Context) (cache.ImmutableRef, error)
CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error)
Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error)
}
type Manager struct {

View File

@ -22,15 +22,23 @@ func init() {
cache = newResolverCache()
}
func NewResolver(ctx context.Context, hosts docker.RegistryHosts, sm *session.Manager, imageStore images.Store, mode source.ResolveMode, ref string) remotes.Resolver {
if res := cache.Get(ctx, ref); res != nil {
return withLocal(res, imageStore, mode)
type ResolverOpt struct {
Hosts docker.RegistryHosts
Auth *resolver.SessionAuthenticator
ImageStore images.Store
Mode source.ResolveMode
Ref string
}
func NewResolver(g session.Group, opt ResolverOpt) remotes.Resolver {
if res := cache.Get(opt.Ref, g); res != nil {
return withLocal(res, opt.ImageStore, opt.Mode)
}
r := resolver.New(ctx, hosts, sm)
r = cache.Add(ctx, ref, r)
r := resolver.New(opt.Hosts, opt.Auth)
r = cache.Add(opt.Ref, r, opt.Auth, g)
return withLocal(r, imageStore, mode)
return withLocal(r, opt.ImageStore, opt.Mode)
}
func EnsureManifestRequested(ctx context.Context, res remotes.Resolver, ref string) {
@ -109,6 +117,7 @@ type cachedResolver struct {
counter int64
timeout time.Time
remotes.Resolver
auth *resolver.SessionAuthenticator
}
func (cr *cachedResolver) Resolve(ctx context.Context, ref string) (name string, desc ocispec.Descriptor, err error) {
@ -116,19 +125,21 @@ func (cr *cachedResolver) Resolve(ctx context.Context, ref string) (name string,
return cr.Resolver.Resolve(ctx, ref)
}
func (r *resolverCache) Add(ctx context.Context, ref string, resolver remotes.Resolver) remotes.Resolver {
func (r *resolverCache) Add(ref string, resolver remotes.Resolver, auth *resolver.SessionAuthenticator, g session.Group) *cachedResolver {
r.mu.Lock()
defer r.mu.Unlock()
ref = r.repo(ref) + "-" + session.FromContext(ctx)
ref = r.repo(ref)
cr, ok := r.m[ref]
cr.timeout = time.Now().Add(time.Minute)
if ok {
cr.auth.AddSession(g)
return &cr
}
cr.Resolver = resolver
cr.auth = auth
r.m[ref] = cr
return &cr
}
@ -141,17 +152,18 @@ func (r *resolverCache) repo(refStr string) string {
return ref.Name()
}
func (r *resolverCache) Get(ctx context.Context, ref string) remotes.Resolver {
func (r *resolverCache) Get(ref string, g session.Group) *cachedResolver {
r.mu.Lock()
defer r.mu.Unlock()
ref = r.repo(ref) + "-" + session.FromContext(ctx)
ref = r.repo(ref)
cr, ok := r.m[ref]
if !ok {
return nil
if ok {
cr.auth.AddSession(g)
return &cr
}
return &cr
return nil
}
func (r *resolverCache) clean(now time.Time) {

View File

@ -24,7 +24,7 @@ import (
"github.com/sirupsen/logrus"
)
func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst digest.Digest, ref string, insecure bool, hosts docker.RegistryHosts, byDigest bool) error {
func Push(ctx context.Context, sm *session.Manager, sid string, cs content.Store, dgst digest.Digest, ref string, insecure bool, hosts docker.RegistryHosts, byDigest bool) error {
desc := ocispec.Descriptor{
Digest: dgst,
}
@ -42,7 +42,7 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst diges
ref = reference.TagNameOnly(parsed).String()
}
resolver := resolver.New(ctx, hosts, sm)
resolver := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, session.NewGroup(sid)))
pusher, err := resolver.Pusher(ctx, ref)
if err != nil {

View File

@ -1,7 +1,6 @@
package resolver
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
@ -11,6 +10,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/containerd/containerd/remotes"
@ -149,16 +149,68 @@ func NewRegistryConfig(m map[string]config.RegistryConfig) docker.RegistryHosts
)
}
func New(ctx context.Context, hosts docker.RegistryHosts, sm *session.Manager) remotes.Resolver {
type SessionAuthenticator struct {
sm *session.Manager
groups []session.Group
mu sync.RWMutex
cache map[string]credentials
cacheMu sync.RWMutex
}
type credentials struct {
user string
secret string
created time.Time
}
func NewSessionAuthenticator(sm *session.Manager, g session.Group) *SessionAuthenticator {
return &SessionAuthenticator{sm: sm, groups: []session.Group{g}, cache: map[string]credentials{}}
}
func (a *SessionAuthenticator) credentials(h string) (string, string, error) {
const credentialsTimeout = time.Minute
a.cacheMu.RLock()
c, ok := a.cache[h]
if ok && time.Since(c.created) < credentialsTimeout {
a.cacheMu.RUnlock()
return c.user, c.secret, nil
}
a.cacheMu.RUnlock()
a.mu.RLock()
defer a.mu.RUnlock()
var err error
for i := len(a.groups) - 1; i >= 0; i-- {
var user, secret string
user, secret, err = auth.CredentialsFunc(a.sm, a.groups[i])(h)
if err != nil {
continue
}
a.cacheMu.Lock()
a.cache[h] = credentials{user: user, secret: secret, created: time.Now()}
a.cacheMu.Unlock()
return user, secret, nil
}
return "", "", err
}
func (a *SessionAuthenticator) AddSession(g session.Group) {
a.mu.Lock()
a.groups = append(a.groups, g)
a.mu.Unlock()
}
func New(hosts docker.RegistryHosts, auth *SessionAuthenticator) remotes.Resolver {
return docker.NewResolver(docker.ResolverOptions{
Hosts: hostsWithCredentials(ctx, hosts, sm),
Hosts: hostsWithCredentials(hosts, auth),
})
}
func hostsWithCredentials(ctx context.Context, hosts docker.RegistryHosts, sm *session.Manager) docker.RegistryHosts {
id := session.FromContext(ctx)
if id == "" {
return hosts
func hostsWithCredentials(hosts docker.RegistryHosts, auth *SessionAuthenticator) docker.RegistryHosts {
if hosts == nil {
return nil
}
return func(domain string) ([]docker.RegistryHost, error) {
res, err := hosts(domain)
@ -169,17 +221,9 @@ func hostsWithCredentials(ctx context.Context, hosts docker.RegistryHosts, sm *s
return nil, nil
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
caller, err := sm.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
a := docker.NewDockerAuthorizer(
docker.WithAuthClient(res[0].Client),
docker.WithAuthCreds(auth.CredentialsFunc(context.TODO(), caller)),
docker.WithAuthCreds(auth.credentials),
)
for i := range res {
res[i].Authorizer = a

View File

@ -87,7 +87,7 @@ type Worker struct {
CacheManager cache.Manager
SourceManager *source.Manager
imageWriter *imageexporter.ImageWriter
ImageSource source.Source
ImageSource *containerimage.Source
}
// NewWorker instantiates a local worker
@ -287,17 +287,8 @@ func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
return nil
}
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) {
// ImageSource is typically source/containerimage
resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
if !ok {
return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.ID())
}
return resolveImageConfig.ResolveImageConfig(ctx, ref, opt, sm)
}
type resolveImageConfig interface {
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error)
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) {
return w.ImageSource.ResolveImageConfig(ctx, ref, opt, sm, g)
}
func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {

View File

@ -67,7 +67,7 @@ func newBusyboxSourceSnapshot(ctx context.Context, t *testing.T, w *base.Worker,
require.NoError(t, err)
src, err := w.SourceManager.Resolve(ctx, img, sm)
require.NoError(t, err)
snap, err := src.Snapshot(ctx)
snap, err := src.Snapshot(ctx, nil)
require.NoError(t, err)
return snap
}

View File

@ -27,7 +27,7 @@ type Worker interface {
LoadRef(id string, hidden bool) (cache.ImmutableRef, error)
// ResolveOp resolves Vertex.Sys() to Op implementation.
ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error)
// Exec is similar to executor.Exec but without []mount.Mount
Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error)