buildkit/cache/manager.go

1186 lines
28 KiB
Go
Raw Normal View History

package cache
import (
"context"
"sort"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/leases"
"github.com/docker/docker/pkg/idtools"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/flightcontrol"
digest "github.com/opencontainers/go-digest"
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
var (
ErrLocked = errors.New("locked")
errNotFound = errors.New("not found")
errInvalid = errors.New("invalid")
)
type ManagerOpt struct {
Snapshotter snapshot.Snapshotter
MetadataStore *metadata.Store
ContentStore content.Store
LeaseManager leases.Manager
PruneRefChecker ExternalRefCheckerFunc
GarbageCollect func(ctx context.Context) (gc.Stats, error)
Applier diff.Applier
Differ diff.Comparer
}
type Accessor interface {
GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error)
Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error)
New(ctx context.Context, parent ImmutableRef, opts ...RefOption) (MutableRef, error)
GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) // Rebase?
IdentityMapping() *idtools.IdentityMapping
Metadata(string) *metadata.StorageItem
}
type Controller interface {
DiskUsage(ctx context.Context, info client.DiskUsageInfo) ([]*client.UsageInfo, error)
Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error
}
type Manager interface {
Accessor
Controller
Close() error
}
type ExternalRefCheckerFunc func() (ExternalRefChecker, error)
type ExternalRefChecker interface {
Exists(string, []digest.Digest) bool
}
type cacheManager struct {
records map[string]*cacheRecord
mu sync.Mutex
ManagerOpt
md *metadata.Store
muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
}
func NewManager(opt ManagerOpt) (Manager, error) {
cm := &cacheManager{
ManagerOpt: opt,
md: opt.MetadataStore,
records: make(map[string]*cacheRecord),
}
if err := cm.init(context.TODO()); err != nil {
return nil, err
}
// cm.scheduleGC(5 * time.Minute)
return cm, nil
}
func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ir ImmutableRef, rerr error) {
diffID, err := diffIDFromDescriptor(desc)
if err != nil {
return nil, err
}
chainID := diffID
blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID})
descHandlers := descHandlersOf(opts...)
if descHandlers == nil || descHandlers[desc.Digest] == nil {
if _, err := cm.ContentStore.Info(ctx, desc.Digest); errors.Is(err, errdefs.ErrNotFound) {
return nil, NeedsRemoteProvidersError([]digest.Digest{desc.Digest})
} else if err != nil {
return nil, err
}
}
var p *immutableRef
var parentID string
if parent != nil {
pInfo := parent.Info()
if pInfo.ChainID == "" || pInfo.BlobChainID == "" {
return nil, errors.Errorf("failed to get ref by blob on non-addressable parent")
}
chainID = imagespecidentity.ChainID([]digest.Digest{pInfo.ChainID, chainID})
blobChainID = imagespecidentity.ChainID([]digest.Digest{pInfo.BlobChainID, blobChainID})
p2, err := cm.Get(ctx, parent.ID(), NoUpdateLastUsed, descHandlers)
if err != nil {
return nil, err
}
if err := p2.Finalize(ctx, true); err != nil {
return nil, err
}
parentID = p2.ID()
p = p2.(*immutableRef)
}
releaseParent := false
defer func() {
if releaseParent || rerr != nil && p != nil {
p.Release(context.TODO())
}
}()
cm.mu.Lock()
defer cm.mu.Unlock()
sis, err := cm.MetadataStore.Search("blobchainid:" + blobChainID.String())
if err != nil {
return nil, err
}
if len(sis) > 0 {
ref, err := cm.get(ctx, sis[0].ID(), opts...)
if err != nil && !IsNotFound(err) {
return nil, errors.Wrapf(err, "failed to get record %s by blobchainid", sis[0].ID())
}
if p != nil {
releaseParent = true
}
if err := setImageRefMetadata(ref, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", ref.ID())
}
return ref, nil
}
sis, err = cm.MetadataStore.Search("chainid:" + chainID.String())
if err != nil {
return nil, err
}
var link ImmutableRef
if len(sis) > 0 {
ref, err := cm.get(ctx, sis[0].ID(), opts...)
if err != nil && !IsNotFound(err) {
return nil, errors.Wrapf(err, "failed to get record %s by chainid", sis[0].ID())
}
link = ref
}
id := identity.NewID()
snapshotID := chainID.String()
blobOnly := true
if link != nil {
snapshotID = getSnapshotID(link.Metadata())
blobOnly = getBlobOnly(link.Metadata())
go link.Release(context.TODO())
}
l, err := cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error {
l.ID = id
l.Labels = map[string]string{
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
}
return nil
})
if err != nil {
return nil, errors.Wrap(err, "failed to create lease")
}
defer func() {
if rerr != nil {
if err := cm.ManagerOpt.LeaseManager.Delete(context.TODO(), leases.Lease{
ID: l.ID,
}); err != nil {
logrus.Errorf("failed to remove lease: %+v", err)
}
}
}()
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: snapshotID,
Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(),
}); err != nil {
return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id)
}
if desc.Digest != "" {
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: id}, leases.Resource{
ID: desc.Digest.String(),
Type: "content",
}); err != nil {
return nil, errors.Wrapf(err, "failed to add blob %s to lease", id)
}
}
md, _ := cm.md.Get(id)
rec := &cacheRecord{
mu: &sync.Mutex{},
cm: cm,
refs: make(map[ref]struct{}),
parent: p,
md: md,
}
if err := initializeMetadata(rec, parentID, opts...); err != nil {
return nil, err
}
if err := setImageRefMetadata(rec, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
}
queueDiffID(rec.md, diffID.String())
queueBlob(rec.md, desc.Digest.String())
queueChainID(rec.md, chainID.String())
queueBlobChainID(rec.md, blobChainID.String())
queueSnapshotID(rec.md, snapshotID)
queueBlobOnly(rec.md, blobOnly)
queueMediaType(rec.md, desc.MediaType)
queueBlobSize(rec.md, desc.Size)
queueCommitted(rec.md)
if err := rec.md.Commit(); err != nil {
return nil, err
}
cm.records[id] = rec
return rec.ref(true, descHandlers), nil
}
// init loads all snapshots from metadata state and tries to load the records
// from the snapshotter. If snaphot can't be found, metadata is deleted as well.
func (cm *cacheManager) init(ctx context.Context) error {
items, err := cm.md.All()
if err != nil {
return err
}
for _, si := range items {
if _, err := cm.getRecord(ctx, si.ID()); err != nil {
logrus.Debugf("could not load snapshot %s: %+v", si.ID(), err)
cm.md.Clear(si.ID())
cm.LeaseManager.Delete(ctx, leases.Lease{ID: si.ID()})
}
}
return nil
}
// IdentityMapping returns the userns remapping used for refs
func (cm *cacheManager) IdentityMapping() *idtools.IdentityMapping {
return cm.Snapshotter.IdentityMapping()
}
// Close closes the manager and releases the metadata database lock. No other
// method should be called after Close.
func (cm *cacheManager) Close() error {
// TODO: allocate internal context and cancel it here
return cm.md.Close()
}
// Get returns an immutable snapshot reference for ID
func (cm *cacheManager) Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
return cm.get(ctx, id, opts...)
}
func (cm *cacheManager) Metadata(id string) *metadata.StorageItem {
cm.mu.Lock()
defer cm.mu.Unlock()
r, ok := cm.records[id]
if !ok {
return nil
}
return r.Metadata()
}
// get requires manager lock to be taken
func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (*immutableRef, error) {
rec, err := cm.getRecord(ctx, id, opts...)
if err != nil {
return nil, err
}
rec.mu.Lock()
defer rec.mu.Unlock()
triggerUpdate := true
for _, o := range opts {
if o == NoUpdateLastUsed {
triggerUpdate = false
}
}
descHandlers := descHandlersOf(opts...)
if rec.mutable {
if len(rec.refs) != 0 {
return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {
return rec.equalImmutable.ref(triggerUpdate, descHandlers), nil
}
return rec.mref(triggerUpdate, descHandlers).commit(ctx)
}
return rec.ref(triggerUpdate, descHandlers), nil
}
// getRecord returns record for id. Requires manager lock.
func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOption) (cr *cacheRecord, retErr error) {
checkLazyProviders := func(rec *cacheRecord) error {
missing := NeedsRemoteProvidersError(nil)
dhs := descHandlersOf(opts...)
for {
blob := digest.Digest(getBlob(rec.md))
if isLazy, err := rec.isLazy(ctx); err != nil {
return err
} else if isLazy && dhs[blob] == nil {
missing = append(missing, blob)
}
if rec.parent == nil {
break
}
rec = rec.parent.cacheRecord
}
if len(missing) > 0 {
return missing
}
return nil
}
if rec, ok := cm.records[id]; ok {
if rec.isDead() {
return nil, errors.Wrapf(errNotFound, "failed to get dead record %s", id)
}
if err := checkLazyProviders(rec); err != nil {
return nil, err
}
return rec, nil
}
md, ok := cm.md.Get(id)
if !ok {
return nil, errors.Wrapf(errNotFound, "%s not found", id)
}
if mutableID := getEqualMutable(md); mutableID != "" {
mutable, err := cm.getRecord(ctx, mutableID)
if err != nil {
// check loading mutable deleted record from disk
if IsNotFound(err) {
cm.md.Clear(id)
}
return nil, err
}
// parent refs are possibly lazy so keep it hold the description handlers.
var dhs DescHandlers
if mutable.parent != nil {
dhs = mutable.parent.descHandlers
}
rec := &cacheRecord{
mu: &sync.Mutex{},
cm: cm,
refs: make(map[ref]struct{}),
parent: mutable.parentRef(false, dhs),
md: md,
equalMutable: &mutableRef{cacheRecord: mutable},
}
mutable.equalImmutable = &immutableRef{cacheRecord: rec}
cm.records[id] = rec
return rec, nil
}
var parent *immutableRef
if parentID := getParent(md); parentID != "" {
var err error
parent, err = cm.get(ctx, parentID, append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
defer func() {
if retErr != nil {
parent.mu.Lock()
parent.release(context.TODO())
parent.mu.Unlock()
}
}()
}
rec := &cacheRecord{
mu: &sync.Mutex{},
mutable: !getCommitted(md),
cm: cm,
refs: make(map[ref]struct{}),
parent: parent,
md: md,
}
// the record was deleted but we crashed before data on disk was removed
if getDeleted(md) {
if err := rec.remove(ctx, true); err != nil {
return nil, err
}
return nil, errors.Wrapf(errNotFound, "failed to get deleted record %s", id)
}
if err := initializeMetadata(rec, getParent(md), opts...); err != nil {
return nil, err
}
if err := setImageRefMetadata(rec, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
}
cm.records[id] = rec
if err := checkLazyProviders(rec); err != nil {
return nil, err
}
return rec, nil
}
func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOption) (mr MutableRef, err error) {
id := identity.NewID()
var parent *immutableRef
var parentID string
var parentSnapshotID string
if s != nil {
if _, ok := s.(*immutableRef); ok {
parent = s.Clone().(*immutableRef)
} else {
p, err := cm.Get(ctx, s.ID(), append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
parent = p.(*immutableRef)
}
if err := parent.Finalize(ctx, true); err != nil {
return nil, err
}
if err := parent.Extract(ctx); err != nil {
return nil, err
}
parentSnapshotID = getSnapshotID(parent.md)
parentID = parent.ID()
}
defer func() {
if err != nil && parent != nil {
parent.Release(context.TODO())
}
}()
l, err := cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error {
l.ID = id
l.Labels = map[string]string{
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
}
return nil
})
if err != nil {
return nil, errors.Wrap(err, "failed to create lease")
}
defer func() {
if err != nil {
if err := cm.ManagerOpt.LeaseManager.Delete(context.TODO(), leases.Lease{
ID: l.ID,
}); err != nil {
logrus.Errorf("failed to remove lease: %+v", err)
}
}
}()
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: id,
Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(),
}); err != nil {
return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id)
}
if err := cm.Snapshotter.Prepare(ctx, id, parentSnapshotID); err != nil {
return nil, errors.Wrapf(err, "failed to prepare %s", id)
}
md, _ := cm.md.Get(id)
rec := &cacheRecord{
mu: &sync.Mutex{},
mutable: true,
cm: cm,
refs: make(map[ref]struct{}),
parent: parent,
md: md,
}
if err := initializeMetadata(rec, parentID, opts...); err != nil {
return nil, err
}
if err := setImageRefMetadata(rec, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
}
cm.mu.Lock()
defer cm.mu.Unlock()
cm.records[id] = rec // TODO: save to db
// parent refs are possibly lazy so keep it hold the description handlers.
var dhs DescHandlers
if parent != nil {
dhs = parent.descHandlers
}
return rec.mref(true, dhs), nil
}
func (cm *cacheManager) GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
rec, err := cm.getRecord(ctx, id, opts...)
if err != nil {
return nil, err
}
rec.mu.Lock()
defer rec.mu.Unlock()
if !rec.mutable {
return nil, errors.Wrapf(errInvalid, "%s is not mutable", id)
}
if len(rec.refs) != 0 {
return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {
if len(rec.equalImmutable.refs) != 0 {
return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
}
delete(cm.records, rec.equalImmutable.ID())
if err := rec.equalImmutable.remove(ctx, false); err != nil {
return nil, err
}
rec.equalImmutable = nil
}
return rec.mref(true, descHandlersOf(opts...)), nil
}
func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {
cm.muPrune.Lock()
for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
cm.muPrune.Unlock()
return err
}
}
cm.muPrune.Unlock()
if cm.GarbageCollect != nil {
if _, err := cm.GarbageCollect(ctx); err != nil {
return err
}
}
return nil
}
func (cm *cacheManager) pruneOnce(ctx context.Context, ch chan client.UsageInfo, opt client.PruneInfo) error {
filter, err := filters.ParseAll(opt.Filter...)
if err != nil {
return errors.Wrapf(err, "failed to parse prune filters %v", opt.Filter)
}
var check ExternalRefChecker
if f := cm.PruneRefChecker; f != nil && (!opt.All || len(opt.Filter) > 0) {
c, err := f()
if err != nil {
return errors.WithStack(err)
}
check = c
}
totalSize := int64(0)
if opt.KeepBytes != 0 {
du, err := cm.DiskUsage(ctx, client.DiskUsageInfo{})
if err != nil {
return err
}
for _, ui := range du {
if ui.Shared {
continue
}
totalSize += ui.Size
}
}
return cm.prune(ctx, ch, pruneOpt{
filter: filter,
all: opt.All,
checkShared: check,
keepDuration: opt.KeepDuration,
keepBytes: opt.KeepBytes,
totalSize: totalSize,
})
}
func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt pruneOpt) error {
var toDelete []*deleteRecord
if opt.keepBytes != 0 && opt.totalSize < opt.keepBytes {
return nil
}
cm.mu.Lock()
gcMode := opt.keepBytes != 0
cutOff := time.Now().Add(-opt.keepDuration)
locked := map[*sync.Mutex]struct{}{}
for _, cr := range cm.records {
if _, ok := locked[cr.mu]; ok {
continue
}
cr.mu.Lock()
// ignore duplicates that share data
if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 {
cr.mu.Unlock()
continue
}
if cr.isDead() {
cr.mu.Unlock()
continue
}
if len(cr.refs) == 0 {
recordType := GetRecordType(cr)
if recordType == "" {
recordType = client.UsageRecordTypeRegular
}
shared := false
if opt.checkShared != nil {
shared = opt.checkShared.Exists(cr.ID(), cr.parentChain())
}
if !opt.all {
if recordType == client.UsageRecordTypeInternal || recordType == client.UsageRecordTypeFrontend || shared {
cr.mu.Unlock()
continue
}
}
c := &client.UsageInfo{
ID: cr.ID(),
Mutable: cr.mutable,
RecordType: recordType,
Shared: shared,
}
usageCount, lastUsedAt := getLastUsed(cr.md)
c.LastUsedAt = lastUsedAt
c.UsageCount = usageCount
if opt.keepDuration != 0 {
if lastUsedAt != nil && lastUsedAt.After(cutOff) {
cr.mu.Unlock()
continue
}
}
if opt.filter.Match(adaptUsageInfo(c)) {
toDelete = append(toDelete, &deleteRecord{
cacheRecord: cr,
lastUsedAt: c.LastUsedAt,
usageCount: c.UsageCount,
})
if !gcMode {
cr.dead = true
// mark metadata as deleted in case we crash before cleanup finished
if err := setDeleted(cr.md); err != nil {
cr.mu.Unlock()
cm.mu.Unlock()
return err
}
} else {
locked[cr.mu] = struct{}{}
continue // leave the record locked
}
}
}
cr.mu.Unlock()
}
if gcMode && len(toDelete) > 0 {
sortDeleteRecords(toDelete)
var err error
for i, cr := range toDelete {
// only remove single record at a time
if i == 0 {
cr.dead = true
err = setDeleted(cr.md)
}
cr.mu.Unlock()
}
if err != nil {
return err
}
toDelete = toDelete[:1]
}
cm.mu.Unlock()
if len(toDelete) == 0 {
return nil
}
var err error
for _, cr := range toDelete {
cr.mu.Lock()
usageCount, lastUsedAt := getLastUsed(cr.md)
c := client.UsageInfo{
ID: cr.ID(),
Mutable: cr.mutable,
InUse: len(cr.refs) > 0,
Size: getSize(cr.md),
CreatedAt: GetCreatedAt(cr.md),
Description: GetDescription(cr.md),
LastUsedAt: lastUsedAt,
UsageCount: usageCount,
}
if cr.parent != nil {
c.Parent = cr.parent.ID()
}
if c.Size == sizeUnknown && cr.equalImmutable != nil {
c.Size = getSize(cr.equalImmutable.md) // benefit from DiskUsage calc
}
if c.Size == sizeUnknown {
cr.mu.Unlock() // all the non-prune modifications already protected by cr.dead
s, err := cr.Size(ctx)
if err != nil {
return err
}
c.Size = s
cr.mu.Lock()
}
opt.totalSize -= c.Size
if cr.equalImmutable != nil {
if err1 := cr.equalImmutable.remove(ctx, false); err == nil {
err = err1
}
}
if err1 := cr.remove(ctx, true); err == nil {
err = err1
}
if err == nil && ch != nil {
ch <- c
}
cr.mu.Unlock()
}
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
return cm.prune(ctx, ch, opt)
}
}
func (cm *cacheManager) markShared(m map[string]*cacheUsageInfo) error {
if cm.PruneRefChecker == nil {
return nil
}
c, err := cm.PruneRefChecker()
if err != nil {
return errors.WithStack(err)
}
var markAllParentsShared func(string)
markAllParentsShared = func(id string) {
if v, ok := m[id]; ok {
v.shared = true
if v.parent != "" {
markAllParentsShared(v.parent)
}
}
}
for id := range m {
if m[id].shared {
continue
}
if b := c.Exists(id, m[id].parentChain); b {
markAllParentsShared(id)
}
}
return nil
}
type cacheUsageInfo struct {
refs int
parent string
size int64
mutable bool
createdAt time.Time
usageCount int
lastUsedAt *time.Time
description string
doubleRef bool
recordType client.UsageRecordType
shared bool
parentChain []digest.Digest
}
func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
filter, err := filters.ParseAll(opt.Filter...)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse diskusage filters %v", opt.Filter)
}
cm.mu.Lock()
m := make(map[string]*cacheUsageInfo, len(cm.records))
rescan := make(map[string]struct{}, len(cm.records))
for id, cr := range cm.records {
cr.mu.Lock()
// ignore duplicates that share data
if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 {
cr.mu.Unlock()
continue
}
usageCount, lastUsedAt := getLastUsed(cr.md)
c := &cacheUsageInfo{
refs: len(cr.refs),
mutable: cr.mutable,
size: getSize(cr.md),
createdAt: GetCreatedAt(cr.md),
usageCount: usageCount,
lastUsedAt: lastUsedAt,
description: GetDescription(cr.md),
doubleRef: cr.equalImmutable != nil,
recordType: GetRecordType(cr),
parentChain: cr.parentChain(),
}
if c.recordType == "" {
c.recordType = client.UsageRecordTypeRegular
}
if cr.parent != nil {
c.parent = cr.parent.ID()
}
if cr.mutable && c.refs > 0 {
c.size = 0 // size can not be determined because it is changing
}
m[id] = c
rescan[id] = struct{}{}
cr.mu.Unlock()
}
cm.mu.Unlock()
for {
if len(rescan) == 0 {
break
}
for id := range rescan {
v := m[id]
if v.refs == 0 && v.parent != "" {
m[v.parent].refs--
if v.doubleRef {
m[v.parent].refs--
}
rescan[v.parent] = struct{}{}
}
delete(rescan, id)
}
}
if err := cm.markShared(m); err != nil {
return nil, err
}
var du []*client.UsageInfo
for id, cr := range m {
c := &client.UsageInfo{
ID: id,
Mutable: cr.mutable,
InUse: cr.refs > 0,
Size: cr.size,
Parent: cr.parent,
CreatedAt: cr.createdAt,
Description: cr.description,
LastUsedAt: cr.lastUsedAt,
UsageCount: cr.usageCount,
RecordType: cr.recordType,
Shared: cr.shared,
}
if filter.Match(adaptUsageInfo(c)) {
du = append(du, c)
}
}
eg, ctx := errgroup.WithContext(ctx)
for _, d := range du {
if d.Size == sizeUnknown {
func(d *client.UsageInfo) {
eg.Go(func() error {
ref, err := cm.Get(ctx, d.ID, NoUpdateLastUsed)
if err != nil {
d.Size = 0
return nil
}
s, err := ref.Size(ctx)
if err != nil {
return err
}
d.Size = s
return ref.Release(context.TODO())
})
}(d)
}
}
if err := eg.Wait(); err != nil {
return du, err
}
return du, nil
}
func IsNotFound(err error) bool {
return errors.Is(err, errNotFound)
}
type RefOption interface{}
type cachePolicy int
const (
cachePolicyDefault cachePolicy = iota
cachePolicyRetain
)
type withMetadata interface {
Metadata() *metadata.StorageItem
}
type noUpdateLastUsed struct{}
var NoUpdateLastUsed noUpdateLastUsed
func HasCachePolicyRetain(m withMetadata) bool {
return getCachePolicy(m.Metadata()) == cachePolicyRetain
}
func CachePolicyRetain(m withMetadata) error {
return queueCachePolicy(m.Metadata(), cachePolicyRetain)
}
func CachePolicyDefault(m withMetadata) error {
return queueCachePolicy(m.Metadata(), cachePolicyDefault)
}
func WithDescription(descr string) RefOption {
return func(m withMetadata) error {
return queueDescription(m.Metadata(), descr)
}
}
func WithRecordType(t client.UsageRecordType) RefOption {
return func(m withMetadata) error {
return queueRecordType(m.Metadata(), t)
}
}
func WithCreationTime(tm time.Time) RefOption {
return func(m withMetadata) error {
return queueCreatedAt(m.Metadata(), tm)
}
}
// Need a separate type for imageRef because it needs to be called outside
// initializeMetadata while still being a RefOption, so wrapping it in a
// different type ensures initializeMetadata won't catch it too and duplicate
// setting the metadata.
type imageRefOption func(m withMetadata) error
// WithImageRef appends the given imageRef to the cache ref's metadata
func WithImageRef(imageRef string) RefOption {
return imageRefOption(func(m withMetadata) error {
return appendImageRef(m.Metadata(), imageRef)
})
}
func setImageRefMetadata(m withMetadata, opts ...RefOption) error {
md := m.Metadata()
for _, opt := range opts {
if fn, ok := opt.(imageRefOption); ok {
if err := fn(m); err != nil {
return err
}
}
}
return md.Commit()
}
func initializeMetadata(m withMetadata, parent string, opts ...RefOption) error {
md := m.Metadata()
if tm := GetCreatedAt(md); !tm.IsZero() {
return nil
}
if err := queueParent(md, parent); err != nil {
return err
}
if err := queueCreatedAt(md, time.Now()); err != nil {
return err
}
for _, opt := range opts {
if fn, ok := opt.(func(withMetadata) error); ok {
if err := fn(m); err != nil {
return err
}
}
}
return md.Commit()
}
func adaptUsageInfo(info *client.UsageInfo) filters.Adaptor {
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "id":
return info.ID, info.ID != ""
case "parent":
return info.Parent, info.Parent != ""
case "description":
return info.Description, info.Description != ""
case "inuse":
return "", info.InUse
case "mutable":
return "", info.Mutable
case "immutable":
return "", !info.Mutable
case "type":
return string(info.RecordType), info.RecordType != ""
case "shared":
return "", info.Shared
case "private":
return "", !info.Shared
}
// TODO: add int/datetime/bytes support for more fields
return "", false
})
}
type pruneOpt struct {
filter filters.Filter
all bool
checkShared ExternalRefChecker
keepDuration time.Duration
keepBytes int64
totalSize int64
}
type deleteRecord struct {
*cacheRecord
lastUsedAt *time.Time
usageCount int
lastUsedAtIndex int
usageCountIndex int
}
func sortDeleteRecords(toDelete []*deleteRecord) {
sort.Slice(toDelete, func(i, j int) bool {
if toDelete[i].lastUsedAt == nil {
return true
}
if toDelete[j].lastUsedAt == nil {
return false
}
return toDelete[i].lastUsedAt.Before(*toDelete[j].lastUsedAt)
})
maxLastUsedIndex := 0
var val time.Time
for _, v := range toDelete {
if v.lastUsedAt != nil && v.lastUsedAt.After(val) {
val = *v.lastUsedAt
maxLastUsedIndex++
}
v.lastUsedAtIndex = maxLastUsedIndex
}
sort.Slice(toDelete, func(i, j int) bool {
return toDelete[i].usageCount < toDelete[j].usageCount
})
maxUsageCountIndex := 0
var count int
for _, v := range toDelete {
if v.usageCount != count {
count = v.usageCount
maxUsageCountIndex++
}
v.usageCountIndex = maxUsageCountIndex
}
sort.Slice(toDelete, func(i, j int) bool {
return float64(toDelete[i].lastUsedAtIndex)/float64(maxLastUsedIndex)+
float64(toDelete[i].usageCountIndex)/float64(maxUsageCountIndex) <
float64(toDelete[j].lastUsedAtIndex)/float64(maxLastUsedIndex)+
float64(toDelete[j].usageCountIndex)/float64(maxUsageCountIndex)
})
}
func diffIDFromDescriptor(desc ocispec.Descriptor) (digest.Digest, error) {
diffIDStr, ok := desc.Annotations["containerd.io/uncompressed"]
if !ok {
return "", errors.Errorf("missing uncompressed annotation for %s", desc.Digest)
}
diffID, err := digest.Parse(diffIDStr)
if err != nil {
return "", errors.Wrapf(err, "failed to parse diffID %q for %s", diffIDStr, desc.Digest)
}
return diffID, nil
}