Merge pull request #1636 from tonistiigi/resolver-pool

resolver: add better pooling and custom authenticator
v0.8
Erik Sipsma 2020-08-13 19:03:10 -07:00 committed by GitHub
commit 661cafc09d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1056 additions and 367 deletions

View File

@ -76,7 +76,7 @@ func getContentStore(ctx context.Context, sm *session.Manager, g session.Group,
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
caller, err := sm.Get(timeoutCtx, sessionID)
caller, err := sm.Get(timeoutCtx, sessionID, false)
if err != nil {
return nil, err
}

View File

@ -37,7 +37,7 @@ func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) r
if err != nil {
return nil, err
}
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
remote := resolver.DefaultPool.GetResolver(hosts, ref, "push", sm, g)
pusher, err := remote.Pusher(ctx, ref)
if err != nil {
return nil, err
@ -52,7 +52,7 @@ func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docke
if err != nil {
return nil, specs.Descriptor{}, err
}
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
remote := resolver.DefaultPool.GetResolver(hosts, ref, "pull", sm, g)
xref, desc, err := remote.Resolve(ctx, ref)
if err != nil {
return nil, specs.Descriptor{}, err

View File

@ -51,7 +51,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source,
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID, false)
if err != nil {
return nil, err
}

View File

@ -165,7 +165,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID, false)
if err != nil {
return nil, err
}

View File

@ -135,7 +135,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source,
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID, false)
if err != nil {
return nil, err
}

4
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/codahale/hdrhistogram v0.0.0-20160425231609-f8ad88b59a58 // indirect
github.com/containerd/cgroups v0.0.0-20200710171044-318312a37340 // indirect
github.com/containerd/console v1.0.0
github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860
github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f
github.com/containerd/continuity v0.0.0-20200413184840-d3ef23f19fbb
github.com/containerd/fifo v0.0.0-20200410184934-f15a3290365b // indirect
github.com/containerd/go-cni v1.0.0
@ -71,7 +71,7 @@ require (
)
replace (
github.com/containerd/containerd => github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860
github.com/containerd/containerd => github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f
github.com/docker/docker => github.com/docker/docker v17.12.0-ce-rc1.0.20200310163718-4634ce647cf2+incompatible
github.com/hashicorp/go-immutable-radix => github.com/tonistiigi/go-immutable-radix v0.0.0-20170803185627-826af9ccf0fe
github.com/jaguilar/vt100 => github.com/tonistiigi/vt100 v0.0.0-20190402012908-ad4c4a574305

4
go.sum
View File

@ -27,8 +27,8 @@ github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on
github.com/containerd/console v0.0.0-20191206165004-02ecf6a7291e/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE=
github.com/containerd/console v1.0.0 h1:fU3UuQapBs+zLJu82NhR11Rif1ny2zfMMAyPJzSN5tQ=
github.com/containerd/console v1.0.0/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE=
github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860 h1:30UR1cinmvhqtKTpQWOJada+p36BgAuXf5w+aHEJOto=
github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f h1:eSl1h+oob8CkeY7TXMs6yrs6eXzgZDbqvsOM0l+EOgk=
github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/containerd/continuity v0.0.0-20200228182428-0f16d7a0959c/go.mod h1:Dq467ZllaHgAtVp4p1xUQWBrFXR9s/wyoTpG8zOJGkY=
github.com/containerd/continuity v0.0.0-20200413184840-d3ef23f19fbb h1:nXPkFq8X1a9ycY3GYQpFNxHh3j2JgY7zDZfq2EXMIzk=

View File

@ -8,10 +8,10 @@ import (
"google.golang.org/grpc/codes"
)
func CredentialsFunc(sm *session.Manager, g session.Group) func(string) (string, string, error) {
return func(host string) (string, string, error) {
var user, secret string
err := sm.Any(context.TODO(), g, func(ctx context.Context, _ string, c session.Caller) error {
func CredentialsFunc(sm *session.Manager, g session.Group) func(string) (session, username, secret string, err error) {
return func(host string) (string, string, string, error) {
var sessionID, user, secret string
err := sm.Any(context.TODO(), g, func(ctx context.Context, id string, c session.Caller) error {
client := NewAuthClient(c.Conn())
resp, err := client.Credentials(ctx, &CredentialsRequest{
@ -23,13 +23,14 @@ func CredentialsFunc(sm *session.Manager, g session.Group) func(string) (string,
}
return err
}
sessionID = id
user = resp.Username
secret = resp.Secret
return nil
})
if err != nil {
return "", "", err
return "", "", "", err
}
return user, secret, nil
return sessionID, user, secret, nil
}
}

View File

@ -63,7 +63,7 @@ func TestContentAttachable(t *testing.T) {
})
g.Go(func() error {
c, err := m.Get(ctx, s.ID())
c, err := m.Get(ctx, s.ID(), false)
if err != nil {
return err
}

View File

@ -46,7 +46,7 @@ func TestFileSyncIncludePatterns(t *testing.T) {
})
g.Go(func() (reterr error) {
c, err := m.Get(ctx, s.ID())
c, err := m.Get(ctx, s.ID(), false)
if err != nil {
return err
}

View File

@ -74,7 +74,7 @@ func (sm *Manager) Any(ctx context.Context, g Group, f func(context.Context, str
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
c, err := sm.Get(timeoutCtx, id)
c, err := sm.Get(timeoutCtx, id, false)
if err != nil {
lastErr = err
continue

View File

@ -149,7 +149,7 @@ func (sm *Manager) handleConn(ctx context.Context, conn net.Conn, opts map[strin
}
// Get returns a session by ID
func (sm *Manager) Get(ctx context.Context, id string) (Caller, error) {
func (sm *Manager) Get(ctx context.Context, id string, noWait bool) (Caller, error) {
// session prefix is used to identify vertexes with different contexts so
// they would not collide, but for lookup we don't need the prefix
if p := strings.SplitN(id, ":", 2); len(p) == 2 && len(p[1]) > 0 {
@ -180,7 +180,7 @@ func (sm *Manager) Get(ctx context.Context, id string) (Caller, error) {
}
var ok bool
c, ok = sm.sessions[id]
if !ok || c.closed() {
if (!ok || c.closed()) && !noWait {
sm.updateCondition.Wait()
continue
}
@ -188,6 +188,10 @@ func (sm *Manager) Get(ctx context.Context, id string) (Caller, error) {
break
}
if c == nil {
return nil, nil
}
return c, nil
}

View File

@ -81,13 +81,8 @@ func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.Re
}
res, err := is.g.Do(ctx, key, func(ctx context.Context) (interface{}, error) {
dgst, dt, err := imageutil.Config(ctx, ref, pull.NewResolver(g, pull.ResolverOpt{
Hosts: is.RegistryHosts,
Auth: resolver.NewSessionAuthenticator(sm, g),
ImageStore: is.ImageStore,
Mode: rm,
Ref: ref,
}), is.ContentStore, is.LeaseManager, opt.Platform)
res := resolver.DefaultPool.GetResolver(is.RegistryHosts, ref, "pull", sm, g).WithImageStore(is.ImageStore, rm)
dgst, dt, err := imageutil.Config(ctx, ref, res, is.ContentStore, is.LeaseManager, opt.Platform)
if err != nil {
return nil, err
}
@ -117,28 +112,30 @@ func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session
Src: imageIdentifier.Reference,
}
p := &puller{
CacheAccessor: is.CacheAccessor,
LeaseManager: is.LeaseManager,
Puller: pullerUtil,
id: imageIdentifier,
ResolverOpt: pull.ResolverOpt{
Hosts: is.RegistryHosts,
Auth: resolver.NewSessionAuthenticator(sm, nil),
ImageStore: is.ImageStore,
Mode: imageIdentifier.ResolveMode,
Ref: imageIdentifier.Reference.String(),
},
vtx: vtx,
CacheAccessor: is.CacheAccessor,
LeaseManager: is.LeaseManager,
Puller: pullerUtil,
id: imageIdentifier,
RegistryHosts: is.RegistryHosts,
ImageStore: is.ImageStore,
Mode: imageIdentifier.ResolveMode,
Ref: imageIdentifier.Reference.String(),
SessionManager: sm,
vtx: vtx,
}
return p, nil
}
type puller struct {
CacheAccessor cache.Accessor
LeaseManager leases.Manager
ResolverOpt pull.ResolverOpt
id *source.ImageIdentifier
vtx solver.Vertex
CacheAccessor cache.Accessor
LeaseManager leases.Manager
RegistryHosts docker.RegistryHosts
ImageStore images.Store
Mode source.ResolveMode
Ref string
SessionManager *session.Manager
id *source.ImageIdentifier
vtx solver.Vertex
cacheKeyOnce sync.Once
cacheKeyErr error
@ -169,11 +166,7 @@ func mainManifestKey(ctx context.Context, desc specs.Descriptor, platform specs.
}
func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cacheKey string, cacheOpts solver.CacheOpts, cacheDone bool, err error) {
if p.Puller.Resolver == nil {
p.Puller.Resolver = pull.NewResolver(g, p.ResolverOpt)
} else {
p.ResolverOpt.Auth.AddSession(g)
}
p.Puller.Resolver = resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, g).WithImageStore(p.ImageStore, p.id.ResolveMode)
p.cacheKeyOnce.Do(func() {
ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager, leases.WithExpiration(5*time.Minute), leaseutil.MakeTemporary)
@ -253,11 +246,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach
}
func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.ImmutableRef, err error) {
if p.Puller.Resolver == nil {
p.Puller.Resolver = pull.NewResolver(g, p.ResolverOpt)
} else {
p.ResolverOpt.Auth.AddSession(g)
}
p.Puller.Resolver = resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, g).WithImageStore(p.ImageStore, p.id.ResolveMode)
if len(p.manifest.Remote.Descriptors) == 0 {
return nil, nil

View File

@ -96,9 +96,6 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
return nil, err
}
// workaround for gcr, authentication not supported on blob endpoints
EnsureManifestRequested(ctx, p.Resolver, p.ref)
platform := platforms.Only(p.Platform)
var mu sync.Mutex // images.Dispatch calls handlers in parallel

View File

@ -1,192 +0,0 @@
package pull
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
distreference "github.com/docker/distribution/reference"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/resolver"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
var resCache *resolverCache
func init() {
resCache = newResolverCache()
}
type ResolverOpt struct {
Hosts docker.RegistryHosts
Auth *resolver.SessionAuthenticator
ImageStore images.Store
Mode source.ResolveMode
Ref string
}
func NewResolver(g session.Group, opt ResolverOpt) remotes.Resolver {
if res := resCache.Get(opt.Ref, g); res != nil {
return withLocal(res, opt.ImageStore, opt.Mode)
}
r := resolver.New(opt.Hosts, opt.Auth)
r = resCache.Add(opt.Ref, r, opt.Auth, g)
return withLocal(r, opt.ImageStore, opt.Mode)
}
func EnsureManifestRequested(ctx context.Context, res remotes.Resolver, ref string) {
rr := res
lr, ok := res.(withLocalResolver)
if ok {
if atomic.LoadInt64(&lr.counter) > 0 {
return
}
rr = lr.Resolver
}
cr, ok := rr.(*cachedResolver)
if !ok {
return
}
if atomic.LoadInt64(cr.counter) == 0 {
res.Resolve(ctx, ref)
}
}
func withLocal(r remotes.Resolver, imageStore images.Store, mode source.ResolveMode) remotes.Resolver {
if imageStore == nil || mode == source.ResolveModeForcePull {
return r
}
return withLocalResolver{Resolver: r, is: imageStore, mode: mode}
}
// A remotes.Resolver which checks the local image store if the real
// resolver cannot find the image, essentially falling back to a local
// image if one is present.
//
// We do not override the Fetcher or Pusher methods:
//
// - Fetcher is called by github.com/containerd/containerd/remotes/:fetch()
// only after it has checked for the content locally, so avoid the
// hassle of interposing a local-fetch proxy and simply pass on the
// request.
// - Pusher wouldn't make sense to push locally, so just forward.
type withLocalResolver struct {
counter int64 // needs to be 64bit aligned for 32bit systems
remotes.Resolver
is images.Store
mode source.ResolveMode
}
func (r withLocalResolver) Resolve(ctx context.Context, ref string) (string, ocispec.Descriptor, error) {
if r.mode == source.ResolveModePreferLocal {
if img, err := r.is.Get(ctx, ref); err == nil {
atomic.AddInt64(&r.counter, 1)
return ref, img.Target, nil
}
}
n, desc, err := r.Resolver.Resolve(ctx, ref)
if err == nil {
return n, desc, err
}
if r.mode == source.ResolveModeDefault {
if img, err := r.is.Get(ctx, ref); err == nil {
return ref, img.Target, nil
}
}
return "", ocispec.Descriptor{}, err
}
type resolverCache struct {
mu sync.Mutex
m map[string]cachedResolver
}
type cachedResolver struct {
counter *int64
timeout time.Time
remotes.Resolver
auth *resolver.SessionAuthenticator
}
func (cr *cachedResolver) Resolve(ctx context.Context, ref string) (name string, desc ocispec.Descriptor, err error) {
atomic.AddInt64(cr.counter, 1)
return cr.Resolver.Resolve(ctx, ref)
}
func (r *resolverCache) Add(ref string, resolver remotes.Resolver, auth *resolver.SessionAuthenticator, g session.Group) *cachedResolver {
r.mu.Lock()
defer r.mu.Unlock()
ref = r.repo(ref)
cr, ok := r.m[ref]
cr.timeout = time.Now().Add(time.Minute)
if ok {
cr.auth.AddSession(g)
return &cr
}
var counter int64
cr.counter = &counter
cr.Resolver = resolver
cr.auth = auth
r.m[ref] = cr
return &cr
}
func (r *resolverCache) repo(refStr string) string {
ref, err := distreference.ParseNormalizedNamed(refStr)
if err != nil {
return refStr
}
return ref.Name()
}
func (r *resolverCache) Get(ref string, g session.Group) *cachedResolver {
r.mu.Lock()
defer r.mu.Unlock()
ref = r.repo(ref)
cr, ok := r.m[ref]
if ok {
cr.auth.AddSession(g)
return &cr
}
return nil
}
func (r *resolverCache) clean(now time.Time) {
r.mu.Lock()
for k, cr := range r.m {
if now.After(cr.timeout) {
delete(r.m, k)
}
}
r.mu.Unlock()
}
func newResolverCache() *resolverCache {
rc := &resolverCache{
m: map[string]cachedResolver{},
}
t := time.NewTicker(time.Minute)
go func() {
for {
rc.clean(<-t.C)
}
}()
return rc
}

View File

@ -44,6 +44,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
ref = reference.TagNameOnly(parsed).String()
}
scope := "push"
if insecure {
insecureTrue := true
httpTrue := true
@ -53,9 +54,10 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
PlainHTTP: &httpTrue,
},
})
scope += ":insecure"
}
resolver := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, session.NewGroup(sid)))
resolver := resolver.DefaultPool.GetResolver(hosts, ref, scope, sm, session.NewGroup(sid))
pusher, err := resolver.Pusher(ctx, ref)
if err != nil {

211
util/resolver/auth/fetch.go Normal file
View File

@ -0,0 +1,211 @@
package auth
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/containerd/containerd/log"
"github.com/pkg/errors"
"golang.org/x/net/context/ctxhttp"
)
var (
// ErrNoToken is returned if a request is successful but the body does not
// contain an authorization token.
ErrNoToken = errors.New("authorization server did not include a token in the response")
)
// ErrUnexpectedStatus is returned if a token request returned with unexpected HTTP status
type ErrUnexpectedStatus struct {
Status string
StatusCode int
Body []byte
}
func (e ErrUnexpectedStatus) Error() string {
return fmt.Sprintf("unexpected status: %s", e.Status)
}
func newUnexpectedStatusErr(resp *http.Response) error {
var b []byte
if resp.Body != nil {
b, _ = ioutil.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB
}
return ErrUnexpectedStatus{Status: resp.Status, StatusCode: resp.StatusCode, Body: b}
}
// GenerateTokenOptions generates options for fetching a token based on a challenge
func GenerateTokenOptions(ctx context.Context, host, username, secret string, c Challenge) (TokenOptions, error) {
realm, ok := c.Parameters["realm"]
if !ok {
return TokenOptions{}, errors.New("no realm specified for token auth challenge")
}
realmURL, err := url.Parse(realm)
if err != nil {
return TokenOptions{}, errors.Wrap(err, "invalid token auth challenge realm")
}
to := TokenOptions{
Realm: realmURL.String(),
Service: c.Parameters["service"],
Username: username,
Secret: secret,
}
scope, ok := c.Parameters["scope"]
if ok {
to.Scopes = append(to.Scopes, scope)
} else {
log.G(ctx).WithField("host", host).Debug("no scope specified for token auth challenge")
}
return to, nil
}
// TokenOptions are optios for requesting a token
type TokenOptions struct {
Realm string
Service string
Scopes []string
Username string
Secret string
}
// OAuthTokenResponse is response from fetching token with a OAuth POST request
type OAuthTokenResponse struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
Scope string `json:"scope"`
}
// FetchTokenWithOAuth fetches a token using a POST request
func FetchTokenWithOAuth(ctx context.Context, client *http.Client, headers http.Header, clientID string, to TokenOptions) (*OAuthTokenResponse, error) {
form := url.Values{}
if len(to.Scopes) > 0 {
form.Set("scope", strings.Join(to.Scopes, " "))
}
form.Set("service", to.Service)
form.Set("client_id", clientID)
if to.Username == "" {
form.Set("grant_type", "refresh_token")
form.Set("refresh_token", to.Secret)
} else {
form.Set("grant_type", "password")
form.Set("username", to.Username)
form.Set("password", to.Secret)
}
req, err := http.NewRequest("POST", to.Realm, strings.NewReader(form.Encode()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
if headers != nil {
for k, v := range headers {
req.Header[k] = append(req.Header[k], v...)
}
}
resp, err := ctxhttp.Do(ctx, client, req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, errors.WithStack(newUnexpectedStatusErr(resp))
}
decoder := json.NewDecoder(resp.Body)
var tr OAuthTokenResponse
if err = decoder.Decode(&tr); err != nil {
return nil, errors.Wrap(err, "unable to decode token response")
}
if tr.AccessToken == "" {
return nil, errors.WithStack(ErrNoToken)
}
return &tr, nil
}
// FetchTokenResponse is response from fetching token with GET request
type FetchTokenResponse struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
RefreshToken string `json:"refresh_token"`
}
// FetchToken fetches a token using a GET request
func FetchToken(ctx context.Context, client *http.Client, headers http.Header, to TokenOptions) (*FetchTokenResponse, error) {
req, err := http.NewRequest("GET", to.Realm, nil)
if err != nil {
return nil, err
}
if headers != nil {
for k, v := range headers {
req.Header[k] = append(req.Header[k], v...)
}
}
reqParams := req.URL.Query()
if to.Service != "" {
reqParams.Add("service", to.Service)
}
for _, scope := range to.Scopes {
reqParams.Add("scope", scope)
}
if to.Secret != "" {
req.SetBasicAuth(to.Username, to.Secret)
}
req.URL.RawQuery = reqParams.Encode()
resp, err := ctxhttp.Do(ctx, client, req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, errors.WithStack(newUnexpectedStatusErr(resp))
}
decoder := json.NewDecoder(resp.Body)
var tr FetchTokenResponse
if err = decoder.Decode(&tr); err != nil {
return nil, errors.Wrap(err, "unable to decode token response")
}
// `access_token` is equivalent to `token` and if both are specified
// the choice is undefined. Canonicalize `access_token` by sticking
// things in `token`.
if tr.AccessToken != "" {
tr.Token = tr.AccessToken
}
if tr.Token == "" {
return nil, errors.WithStack(ErrNoToken)
}
return &tr, nil
}

187
util/resolver/auth/parse.go Normal file
View File

@ -0,0 +1,187 @@
package auth
import (
"net/http"
"sort"
"strings"
)
// AuthenticationScheme defines scheme of the authentication method
type AuthenticationScheme byte
const (
// BasicAuth is scheme for Basic HTTP Authentication RFC 7617
BasicAuth AuthenticationScheme = 1 << iota
// DigestAuth is scheme for HTTP Digest Access Authentication RFC 7616
DigestAuth
// BearerAuth is scheme for OAuth 2.0 Bearer Tokens RFC 6750
BearerAuth
)
// Challenge carries information from a WWW-Authenticate response header.
// See RFC 2617.
type Challenge struct {
// scheme is the auth-scheme according to RFC 2617
Scheme AuthenticationScheme
// parameters are the auth-params according to RFC 2617
Parameters map[string]string
}
type byScheme []Challenge
func (bs byScheme) Len() int { return len(bs) }
func (bs byScheme) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] }
// Sort in priority order: token > digest > basic
func (bs byScheme) Less(i, j int) bool { return bs[i].Scheme > bs[j].Scheme }
// Octet types from RFC 2616.
type octetType byte
var octetTypes [256]octetType
const (
isToken octetType = 1 << iota
isSpace
)
func init() {
// OCTET = <any 8-bit sequence of data>
// CHAR = <any US-ASCII character (octets 0 - 127)>
// CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
// CR = <US-ASCII CR, carriage return (13)>
// LF = <US-ASCII LF, linefeed (10)>
// SP = <US-ASCII SP, space (32)>
// HT = <US-ASCII HT, horizontal-tab (9)>
// <"> = <US-ASCII double-quote mark (34)>
// CRLF = CR LF
// LWS = [CRLF] 1*( SP | HT )
// TEXT = <any OCTET except CTLs, but including LWS>
// separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <">
// | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT
// token = 1*<any CHAR except CTLs or separators>
// qdtext = <any TEXT except <">>
for c := 0; c < 256; c++ {
var t octetType
isCtl := c <= 31 || c == 127
isChar := 0 <= c && c <= 127
isSeparator := strings.ContainsRune(" \t\"(),/:;<=>?@[]\\{}", rune(c))
if strings.ContainsRune(" \t\r\n", rune(c)) {
t |= isSpace
}
if isChar && !isCtl && !isSeparator {
t |= isToken
}
octetTypes[c] = t
}
}
// ParseAuthHeader parses challenges from WWW-Authenticate header
func ParseAuthHeader(header http.Header) []Challenge {
challenges := []Challenge{}
for _, h := range header[http.CanonicalHeaderKey("WWW-Authenticate")] {
v, p := parseValueAndParams(h)
var s AuthenticationScheme
switch v {
case "basic":
s = BasicAuth
case "digest":
s = DigestAuth
case "bearer":
s = BearerAuth
default:
continue
}
challenges = append(challenges, Challenge{Scheme: s, Parameters: p})
}
sort.Stable(byScheme(challenges))
return challenges
}
func parseValueAndParams(header string) (value string, params map[string]string) {
params = make(map[string]string)
value, s := expectToken(header)
if value == "" {
return
}
value = strings.ToLower(value)
for {
var pkey string
pkey, s = expectToken(skipSpace(s))
if pkey == "" {
return
}
if !strings.HasPrefix(s, "=") {
return
}
var pvalue string
pvalue, s = expectTokenOrQuoted(s[1:])
if pvalue == "" {
return
}
pkey = strings.ToLower(pkey)
params[pkey] = pvalue
s = skipSpace(s)
if !strings.HasPrefix(s, ",") {
return
}
s = s[1:]
}
}
func skipSpace(s string) (rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isSpace == 0 {
break
}
}
return s[i:]
}
func expectToken(s string) (token, rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isToken == 0 {
break
}
}
return s[:i], s[i:]
}
func expectTokenOrQuoted(s string) (value string, rest string) {
if !strings.HasPrefix(s, "\"") {
return expectToken(s)
}
s = s[1:]
for i := 0; i < len(s); i++ {
switch s[i] {
case '"':
return s[:i], s[i+1:]
case '\\':
p := make([]byte, len(s)-1)
j := copy(p, s[:i])
escape := true
for i = i + 1; i < len(s); i++ {
b := s[i]
switch {
case escape:
escape = false
p[j] = b
j++
case b == '\\':
escape = true
case b == '"':
return string(p[:j]), s[i+1:]
default:
p[j] = b
j++
}
}
return "", ""
}
}
return "", ""
}

366
util/resolver/authorizer.go Normal file
View File

@ -0,0 +1,366 @@
package resolver
import (
"context"
"encoding/base64"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/remotes/docker"
"github.com/moby/buildkit/session"
sessionauth "github.com/moby/buildkit/session/auth"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/resolver/auth"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type authHandlerNS struct {
counter int64 // needs to be 64bit aligned for 32bit systems
mu sync.Mutex
handlers map[string]*authHandler
hosts map[string][]docker.RegistryHost
sm *session.Manager
g flightcontrol.Group
}
func newAuthHandlerNS(sm *session.Manager) *authHandlerNS {
return &authHandlerNS{
handlers: map[string]*authHandler{},
hosts: map[string][]docker.RegistryHost{},
sm: sm,
}
}
func (a *authHandlerNS) get(host string, sm *session.Manager, g session.Group) *authHandler {
if g != nil {
if iter := g.SessionIterator(); iter != nil {
for {
id := iter.NextSession()
if id == "" {
break
}
h, ok := a.handlers[host+"/"+id]
if ok {
h.lastUsed = time.Now()
return h
}
}
}
}
// link another handler
for k, h := range a.handlers {
parts := strings.SplitN(k, "/", 2)
if len(parts) != 2 {
continue
}
if parts[0] == host {
session, username, password, err := sessionauth.CredentialsFunc(sm, g)(host)
if err == nil {
if username == h.common.Username && password == h.common.Secret {
a.handlers[host+"/"+session] = h
h.lastUsed = time.Now()
return h
}
}
}
}
return nil
}
func (a *authHandlerNS) set(host, session string, h *authHandler) {
a.handlers[host+"/"+session] = h
}
func (a *authHandlerNS) delete(h *authHandler) {
for k, v := range a.handlers {
if v == h {
delete(a.handlers, k)
}
}
}
type dockerAuthorizer struct {
client *http.Client
sm *session.Manager
session session.Group
handlers *authHandlerNS
}
func newDockerAuthorizer(client *http.Client, handlers *authHandlerNS, sm *session.Manager, group session.Group) *dockerAuthorizer {
return &dockerAuthorizer{
client: client,
handlers: handlers,
sm: sm,
session: group,
}
}
// Authorize handles auth request.
func (a *dockerAuthorizer) Authorize(ctx context.Context, req *http.Request) error {
a.handlers.mu.Lock()
defer a.handlers.mu.Unlock()
// skip if there is no auth handler
ah := a.handlers.get(req.URL.Host, a.sm, a.session)
if ah == nil {
return nil
}
auth, err := ah.authorize(ctx, a.session)
if err != nil {
return err
}
req.Header.Set("Authorization", auth)
return nil
}
func (a *dockerAuthorizer) getCredentials(host string) (sessionID, username, secret string, err error) {
return sessionauth.CredentialsFunc(a.sm, a.session)(host)
}
func (a *dockerAuthorizer) AddResponses(ctx context.Context, responses []*http.Response) error {
a.handlers.mu.Lock()
defer a.handlers.mu.Unlock()
last := responses[len(responses)-1]
host := last.Request.URL.Host
handler := a.handlers.get(host, a.sm, a.session)
for _, c := range auth.ParseAuthHeader(last.Header) {
if c.Scheme == auth.BearerAuth {
if err := invalidAuthorization(c, responses); err != nil {
a.handlers.delete(handler)
oldScope := ""
if handler != nil {
oldScope = strings.Join(handler.common.Scopes, " ")
}
handler = nil
// this hacky way seems to be best method to detect that error is fatal and should not be retried with a new token
if c.Parameters["error"] == "insufficient_scope" && c.Parameters["scope"] == oldScope {
return err
}
}
// reuse existing handler
//
// assume that one registry will return the common
// challenge information, including realm and service.
// and the resource scope is only different part
// which can be provided by each request.
if handler != nil {
return nil
}
session, username, secret, err := a.getCredentials(host)
if err != nil {
return err
}
common, err := auth.GenerateTokenOptions(ctx, host, username, secret, c)
if err != nil {
return err
}
a.handlers.set(host, session, newAuthHandler(a.client, c.Scheme, common))
return nil
} else if c.Scheme == auth.BasicAuth {
session, username, secret, err := a.getCredentials(host)
if err != nil {
return err
}
if username != "" && secret != "" {
common := auth.TokenOptions{
Username: username,
Secret: secret,
}
a.handlers.set(host, session, newAuthHandler(a.client, c.Scheme, common))
return nil
}
}
}
return errors.Wrap(errdefs.ErrNotImplemented, "failed to find supported auth scheme")
}
// authResult is used to control limit rate.
type authResult struct {
sync.WaitGroup
token string
err error
expires time.Time
}
// authHandler is used to handle auth request per registry server.
type authHandler struct {
sync.Mutex
client *http.Client
// only support basic and bearer schemes
scheme auth.AuthenticationScheme
// common contains common challenge answer
common auth.TokenOptions
// scopedTokens caches token indexed by scopes, which used in
// bearer auth case
scopedTokens map[string]*authResult
lastUsed time.Time
}
func newAuthHandler(client *http.Client, scheme auth.AuthenticationScheme, opts auth.TokenOptions) *authHandler {
return &authHandler{
client: client,
scheme: scheme,
common: opts,
scopedTokens: map[string]*authResult{},
lastUsed: time.Now(),
}
}
func (ah *authHandler) authorize(ctx context.Context, g session.Group) (string, error) {
switch ah.scheme {
case auth.BasicAuth:
return ah.doBasicAuth(ctx)
case auth.BearerAuth:
return ah.doBearerAuth(ctx)
default:
return "", errors.Wrapf(errdefs.ErrNotImplemented, "failed to find supported auth scheme: %s", string(ah.scheme))
}
}
func (ah *authHandler) doBasicAuth(ctx context.Context) (string, error) {
username, secret := ah.common.Username, ah.common.Secret
if username == "" || secret == "" {
return "", fmt.Errorf("failed to handle basic auth because missing username or secret")
}
auth := base64.StdEncoding.EncodeToString([]byte(username + ":" + secret))
return fmt.Sprintf("Basic %s", auth), nil
}
func (ah *authHandler) doBearerAuth(ctx context.Context) (token string, err error) {
// copy common tokenOptions
to := ah.common
to.Scopes = docker.GetTokenScopes(ctx, to.Scopes)
// Docs: https://docs.docker.com/registry/spec/auth/scope
scoped := strings.Join(to.Scopes, " ")
ah.Lock()
if r, exist := ah.scopedTokens[scoped]; exist {
ah.Unlock()
r.Wait()
if r.expires.IsZero() || r.expires.After(time.Now()) {
return r.token, r.err
}
ah.Lock()
}
// only one fetch token job
r := new(authResult)
r.Add(1)
ah.scopedTokens[scoped] = r
ah.Unlock()
var issuedAt time.Time
var expires int
defer func() {
token = fmt.Sprintf("Bearer %s", token)
r.token, r.err = token, err
if err == nil {
if issuedAt.IsZero() {
issuedAt = time.Now()
}
if exp := issuedAt.Add(time.Duration(float64(expires)*0.9) * time.Second); time.Now().Before(exp) {
r.expires = exp
}
}
r.Done()
}()
// fetch token for the resource scope
if to.Secret != "" {
defer func() {
err = errors.Wrap(err, "failed to fetch oauth token")
}()
// try GET first because Docker Hub does not support POST
// switch once support has landed
resp, err := auth.FetchToken(ctx, ah.client, nil, to)
if err != nil {
var errStatus auth.ErrUnexpectedStatus
if errors.As(err, &errStatus) {
// retry with POST request
// As of September 2017, GCR is known to return 404.
// As of February 2018, JFrog Artifactory is known to return 401.
if (errStatus.StatusCode == 405 && to.Username != "") || errStatus.StatusCode == 404 || errStatus.StatusCode == 401 {
resp, err := auth.FetchTokenWithOAuth(ctx, ah.client, nil, "containerd-client", to)
if err != nil {
return "", err
}
issuedAt, expires = resp.IssuedAt, resp.ExpiresIn
return resp.AccessToken, nil
}
log.G(ctx).WithFields(logrus.Fields{
"status": errStatus.Status,
"body": string(errStatus.Body),
}).Debugf("token request failed")
}
return "", err
}
issuedAt, expires = resp.IssuedAt, resp.ExpiresIn
return resp.Token, nil
}
// do request anonymously
resp, err := auth.FetchToken(ctx, ah.client, nil, to)
if err != nil {
return "", errors.Wrap(err, "failed to fetch anonymous token")
}
issuedAt, expires = resp.IssuedAt, resp.ExpiresIn
return resp.Token, nil
}
func invalidAuthorization(c auth.Challenge, responses []*http.Response) error {
errStr := c.Parameters["error"]
if errStr == "" {
return nil
}
n := len(responses)
if n == 1 || (n > 1 && !sameRequest(responses[n-2].Request, responses[n-1].Request)) {
return nil
}
return errors.Wrapf(docker.ErrInvalidAuthorization, "server message: %s", errStr)
}
func sameRequest(r1, r2 *http.Request) bool {
if r1.Method != r2.Method {
return false
}
if *r1.URL != *r2.URL {
return false
}
return true
}

206
util/resolver/pool.go Normal file
View File

@ -0,0 +1,206 @@
package resolver
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
distreference "github.com/docker/distribution/reference"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/source"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// DefaultPool is the default shared resolver pool instance
var DefaultPool = NewPool()
// Pool is a cache of recently used resolvers
type Pool struct {
mu sync.Mutex
m map[string]*authHandlerNS
}
// NewPool creates a new pool for caching resolvers
func NewPool() *Pool {
p := &Pool{
m: map[string]*authHandlerNS{},
}
time.AfterFunc(5*time.Minute, p.gc)
return p
}
func (p *Pool) gc() {
p.mu.Lock()
defer p.mu.Unlock()
for k, ns := range p.m {
ns.mu.Lock()
for key, h := range ns.handlers {
if time.Since(h.lastUsed) < 10*time.Minute {
continue
}
parts := strings.SplitN(key, "/", 2)
if len(parts) != 2 {
delete(ns.handlers, key)
continue
}
c, err := ns.sm.Get(context.TODO(), parts[1], true)
if c == nil || err != nil {
delete(ns.handlers, key)
}
}
if len(ns.handlers) == 0 {
delete(p.m, k)
}
ns.mu.Unlock()
}
time.AfterFunc(5*time.Minute, p.gc)
}
// Clear deletes currently cached items. This may be called on config changes for example.
func (p *Pool) Clear() {
p.mu.Lock()
defer p.mu.Unlock()
p.m = map[string]*authHandlerNS{}
}
// GetResolver gets a resolver for a specified scope from the pool
func (p *Pool) GetResolver(hosts docker.RegistryHosts, ref, scope string, sm *session.Manager, g session.Group) *Resolver {
name := ref
named, err := distreference.ParseNormalizedNamed(ref)
if err == nil {
name = named.Name()
}
key := fmt.Sprintf("%s::%s", name, scope)
p.mu.Lock()
defer p.mu.Unlock()
h, ok := p.m[key]
if !ok {
h = newAuthHandlerNS(sm)
p.m[key] = h
}
return newResolver(hosts, h, sm, g)
}
func newResolver(hosts docker.RegistryHosts, handler *authHandlerNS, sm *session.Manager, g session.Group) *Resolver {
if hosts == nil {
hosts = docker.ConfigureDefaultRegistries(
docker.WithClient(newDefaultClient()),
docker.WithPlainHTTP(docker.MatchLocalhost),
)
}
r := &Resolver{
hosts: hosts,
sm: sm,
g: g,
handler: handler,
}
r.Resolver = docker.NewResolver(docker.ResolverOptions{
Hosts: r.hostsFunc,
})
return r
}
// Resolver is a wrapper around remotes.Resolver
type Resolver struct {
remotes.Resolver
hosts docker.RegistryHosts
sm *session.Manager
g session.Group
handler *authHandlerNS
auth *dockerAuthorizer
is images.Store
mode source.ResolveMode
}
func (r *Resolver) hostsFunc(host string) ([]docker.RegistryHost, error) {
return func(domain string) ([]docker.RegistryHost, error) {
v, err := r.handler.g.Do(context.TODO(), domain, func(ctx context.Context) (interface{}, error) {
// long lock not needed because flightcontrol.Do
r.handler.mu.Lock()
v, ok := r.handler.hosts[domain]
r.handler.mu.Unlock()
if ok {
return v, nil
}
res, err := r.hosts(domain)
if err != nil {
return nil, err
}
r.handler.mu.Lock()
r.handler.hosts[domain] = res
r.handler.mu.Unlock()
return res, nil
})
if err != nil || v == nil {
return nil, err
}
res := v.([]docker.RegistryHost)
if len(res) == 0 {
return nil, nil
}
auth := newDockerAuthorizer(res[0].Client, r.handler, r.sm, r.g)
for i := range res {
res[i].Authorizer = auth
}
return res, nil
}(host)
}
// WithSession returns a new resolver that works with new session group
func (r *Resolver) WithSession(s session.Group) *Resolver {
r2 := *r
r2.auth = nil
r2.g = s
return &r2
}
// WithImageStore returns new resolver that can also resolve from local images store
func (r *Resolver) WithImageStore(is images.Store, mode source.ResolveMode) *Resolver {
r2 := *r
r2.Resolver = r.Resolver
r2.is = is
r2.mode = mode
return &r2
}
// Fetcher returns a new fetcher for the provided reference.
func (r *Resolver) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) {
if atomic.LoadInt64(&r.handler.counter) == 0 {
r.Resolve(ctx, ref)
}
return r.Resolver.Fetcher(ctx, ref)
}
// Resolve attempts to resolve the reference into a name and descriptor.
func (r *Resolver) Resolve(ctx context.Context, ref string) (string, ocispec.Descriptor, error) {
if r.mode == source.ResolveModePreferLocal && r.is != nil {
if img, err := r.is.Get(ctx, ref); err == nil {
return ref, img.Target, nil
}
}
n, desc, err := r.Resolver.Resolve(ctx, ref)
if err == nil {
atomic.AddInt64(&r.handler.counter, 1)
return n, desc, err
}
if r.mode == source.ResolveModeDefault && r.is != nil {
if img, err := r.is.Get(ctx, ref); err == nil {
return ref, img.Target, nil
}
}
return "", ocispec.Descriptor{}, err
}

View File

@ -1,7 +1,6 @@
package resolver
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
@ -11,15 +10,10 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/moby/buildkit/cmd/buildkitd/config"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/auth"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/tracing"
"github.com/pkg/errors"
)
@ -121,6 +115,7 @@ func loadTLSConfig(c config.RegistryConfig) (*tls.Config, error) {
return tc, nil
}
// NewRegistryConfig converts registry config to docker.RegistryHosts callback
func NewRegistryConfig(m map[string]config.RegistryConfig) docker.RegistryHosts {
return docker.Registries(
func(host string) ([]docker.RegistryHost, error) {
@ -175,110 +170,6 @@ func NewRegistryConfig(m map[string]config.RegistryConfig) docker.RegistryHosts
)
}
type SessionAuthenticator struct {
sm *session.Manager
groups []session.Group
mu sync.RWMutex
cache map[string]credentials
cacheMu sync.RWMutex
}
type credentials struct {
user string
secret string
created time.Time
}
func NewSessionAuthenticator(sm *session.Manager, g session.Group) *SessionAuthenticator {
return &SessionAuthenticator{sm: sm, groups: []session.Group{g}, cache: map[string]credentials{}}
}
func (a *SessionAuthenticator) credentials(h string) (string, string, error) {
const credentialsTimeout = time.Minute
a.cacheMu.RLock()
c, ok := a.cache[h]
if ok && time.Since(c.created) < credentialsTimeout {
a.cacheMu.RUnlock()
return c.user, c.secret, nil
}
a.cacheMu.RUnlock()
a.mu.RLock()
defer a.mu.RUnlock()
var err error
for i := len(a.groups) - 1; i >= 0; i-- {
var user, secret string
user, secret, err = auth.CredentialsFunc(a.sm, a.groups[i])(h)
if err != nil {
continue
}
a.cacheMu.Lock()
a.cache[h] = credentials{user: user, secret: secret, created: time.Now()}
a.cacheMu.Unlock()
return user, secret, nil
}
return "", "", err
}
func (a *SessionAuthenticator) AddSession(g session.Group) {
a.mu.Lock()
a.groups = append(a.groups, g)
a.mu.Unlock()
}
func New(hosts docker.RegistryHosts, auth *SessionAuthenticator) remotes.Resolver {
return docker.NewResolver(docker.ResolverOptions{
Hosts: hostsWithCredentials(hosts, auth),
})
}
func hostsWithCredentials(hosts docker.RegistryHosts, auth *SessionAuthenticator) docker.RegistryHosts {
if hosts == nil {
return nil
}
cache := map[string][]docker.RegistryHost{}
var mu sync.Mutex
var g flightcontrol.Group
return func(domain string) ([]docker.RegistryHost, error) {
v, err := g.Do(context.TODO(), domain, func(ctx context.Context) (interface{}, error) {
mu.Lock()
v, ok := cache[domain]
mu.Unlock()
if ok {
return v, nil
}
res, err := hosts(domain)
if err != nil {
return nil, err
}
if len(res) == 0 {
return nil, nil
}
a := docker.NewDockerAuthorizer(
docker.WithAuthClient(res[0].Client),
docker.WithAuthCreds(auth.credentials),
)
for i := range res {
res[i].Authorizer = a
}
mu.Lock()
cache[domain] = res
mu.Unlock()
return res, nil
})
if err != nil {
return nil, err
}
if v == nil {
return nil, nil
}
return v.([]docker.RegistryHost), nil
}
}
func newDefaultClient() *http.Client {
return &http.Client{
Transport: tracing.NewTransport(newDefaultTransport()),

View File

@ -309,3 +309,12 @@ in that process. Container root file systems will be maintained on upgrade.
We may make exceptions in the interest of __security patches__. If a break is
required, it will be communicated clearly and the solution will be considered
against total impact.
## Deprecated features
The deprecated features are shown in the following table:
| Component | Deprecation release | Target release for removal |
|----------------------------------------------------------------------|---------------------|----------------------------|
| Runtime V1 API and implementation (`io.containerd.runtime.v1.linux`) | containerd v1.4 | containerd v2.0 |
| Runc V1 implementation of Runtime V2 (`io.containerd.runc.v1`) | containerd v1.4 | containerd v2.0 |

View File

@ -290,6 +290,7 @@ func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...N
client: c.client,
io: i,
id: c.id,
c: c,
}
if info.Checkpoint != nil {
request.Checkpoint = info.Checkpoint
@ -407,6 +408,7 @@ func (c *container) loadTask(ctx context.Context, ioAttach cio.Attach) (Task, er
io: i,
id: response.Process.ID,
pid: response.Process.Pid,
c: c,
}
return t, nil
}

View File

@ -118,3 +118,10 @@ func deviceFromPath(path, permissions string) (*specs.LinuxDevice, error) {
GID: &stat.Gid,
}, nil
}
// WithCPUCFS sets the container's Completely fair scheduling (CFS) quota and period
func WithCPUCFS(quota int64, period uint64) SpecOpts {
return func(ctx context.Context, _ Client, c *containers.Container, s *Spec) error {
return nil
}
}

View File

@ -273,7 +273,7 @@ func (ah *authHandler) doBearerAuth(ctx context.Context) (string, error) {
// copy common tokenOptions
to := ah.common
to.scopes = getTokenScopes(ctx, to.scopes)
to.scopes = GetTokenScopes(ctx, to.scopes)
// Docs: https://docs.docker.com/registry/spec/auth/scope
scoped := strings.Join(to.scopes, " ")

View File

@ -72,8 +72,8 @@ func contextWithAppendPullRepositoryScope(ctx context.Context, repo string) cont
return WithScope(ctx, fmt.Sprintf("repository:%s:pull", repo))
}
// getTokenScopes returns deduplicated and sorted scopes from ctx.Value(tokenScopesKey{}) and common scopes.
func getTokenScopes(ctx context.Context, common []string) []string {
// GetTokenScopes returns deduplicated and sorted scopes from ctx.Value(tokenScopesKey{}) and common scopes.
func GetTokenScopes(ctx context.Context, common []string) []string {
var scopes []string
if x := ctx.Value(tokenScopesKey{}); x != nil {
scopes = append(scopes, x.([]string)...)

View File

@ -35,6 +35,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/rootfs"
"github.com/containerd/containerd/runtime/linux/runctypes"
@ -175,18 +176,26 @@ type Task interface {
// For the built in Linux runtime, github.com/containerd/cgroups.Metrics
// are returned in protobuf format
Metrics(context.Context) (*types.Metric, error)
// Spec returns the current OCI specification for the task
Spec(context.Context) (*oci.Spec, error)
}
var _ = (Task)(&task{})
type task struct {
client *Client
c Container
io cio.IO
id string
pid uint32
}
// Spec returns the current OCI specification for the task
func (t *task) Spec(ctx context.Context) (*oci.Spec, error) {
return t.c.Spec(ctx)
}
// ID of the task
func (t *task) ID() string {
return t.id

View File

@ -4,7 +4,7 @@ github.com/cespare/xxhash/v2 v2.1.1
github.com/containerd/btrfs 153935315f4ab9be5bf03650a1341454b05efa5d
github.com/containerd/cgroups 318312a373405e5e91134d8063d04d59768a1bff
github.com/containerd/console v1.0.0
github.com/containerd/continuity d3ef23f19fbb106bb73ffde425d07a9187e30745
github.com/containerd/continuity efbc4488d8fe1bdc16bde3b2d2990d9b3a899165
github.com/containerd/fifo f15a3290365b9d2627d189e619ab4008e0069caf
github.com/containerd/go-runc 7016d3ce2328dd2cb1192b2076ebd565c4e8df0c
github.com/containerd/ttrpc v1.0.1
@ -31,7 +31,7 @@ github.com/Microsoft/go-winio v0.4.14
github.com/Microsoft/hcsshim v0.8.9
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.0.1
github.com/opencontainers/runc v1.0.0-rc91
github.com/opencontainers/runc 67169a9d43456ff0d5ae12b967acb8e366e2f181 # v1.0.0-rc91-48-g67169a9d
github.com/opencontainers/runtime-spec 237cc4f519e2e8f9b235bacccfa8ef5a84df2875 # v1.0.3-0.20200520003142-237cc4f519e2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.6.0

2
vendor/modules.txt vendored
View File

@ -40,7 +40,7 @@ github.com/codahale/hdrhistogram
github.com/containerd/cgroups/stats/v1
# github.com/containerd/console v1.0.0
github.com/containerd/console
# github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860 => github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860
# github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f => github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f
github.com/containerd/containerd
github.com/containerd/containerd/api/services/containers/v1
github.com/containerd/containerd/api/services/content/v1