From f3af4fbbf9e3fb160a03b659e603223d579aba47 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 26 Dec 2017 11:42:14 -0800 Subject: [PATCH 1/8] cache: update ref locking readability Signed-off-by: Tonis Tiigi --- cache/manager.go | 30 ++++++++++++++++++++++-------- cache/refs.go | 41 ++++++++++++++++++++++++++++------------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/cache/manager.go b/cache/manager.go index 189a4f0e..4f772147 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -69,6 +69,8 @@ func NewManager(opt ManagerOpt) (Manager, error) { return cm, nil } +// init loads all snapshots from metadata state and tries to load the records +// from the snapshotter. If snaphot can't be found, metadata is deleted as well. func (cm *cacheManager) init(ctx context.Context) error { items, err := cm.md.All() if err != nil { @@ -76,27 +78,32 @@ func (cm *cacheManager) init(ctx context.Context) error { } for _, si := range items { - if _, err := cm.load(ctx, si.ID()); err != nil { - logrus.Debugf("could not load snapshot %s, %v", si.ID(), err) + if _, err := cm.getRecord(ctx, si.ID()); err != nil { + logrus.Debugf("could not load snapshot %s: %v", si.ID(), err) cm.md.Clear(si.ID()) + // TODO: make sure content is deleted as well } } return nil } +// Close closes the manager and releases the metadata database lock. No other +// method should be called after Close. func (cm *cacheManager) Close() error { // TODO: allocate internal context and cancel it here return cm.md.Close() } +// Get returns an immutable snapshot reference for ID func (cm *cacheManager) Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) { cm.mu.Lock() defer cm.mu.Unlock() return cm.get(ctx, id, opts...) } +// get requires manager lock to be taken func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) { - rec, err := cm.load(ctx, id, opts...) + rec, err := cm.getRecord(ctx, id, opts...) if err != nil { return nil, err } @@ -104,7 +111,7 @@ func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) ( defer rec.mu.Unlock() if rec.mutable { - if rec.dead || len(rec.refs) != 0 { + if len(rec.refs) != 0 { return nil, errors.Wrapf(errLocked, "%s is locked", id) } if rec.equalImmutable != nil { @@ -116,18 +123,23 @@ func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) ( return rec.ref(), nil } -func (cm *cacheManager) load(ctx context.Context, id string, opts ...RefOption) (*cacheRecord, error) { - if rec, ok := cm.records[id]; ok && !rec.dead { +// getRecord returns record for id. Requires manager lock. +func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOption) (*cacheRecord, error) { + if rec, ok := cm.records[id]; ok { + if rec.isDead() { + return nil, errNotFound + } return rec, nil } md, _ := cm.md.Get(id) if mutableID := getEqualMutable(md); mutableID != "" { - mutable, err := cm.load(ctx, mutableID) + mutable, err := cm.getRecord(ctx, mutableID) if err != nil { return nil, err } rec := &cacheRecord{ + mu: &sync.Mutex{}, cm: cm, refs: make(map[Mountable]struct{}), parent: mutable.parent, @@ -153,6 +165,7 @@ func (cm *cacheManager) load(ctx context.Context, id string, opts ...RefOption) } rec := &cacheRecord{ + mu: &sync.Mutex{}, mutable: info.Kind != cdsnapshot.KindCommitted, cm: cm, refs: make(map[Mountable]struct{}), @@ -201,6 +214,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti md, _ := cm.md.Get(id) rec := &cacheRecord{ + mu: &sync.Mutex{}, mutable: true, cm: cm, refs: make(map[Mountable]struct{}), @@ -226,7 +240,7 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, cm.mu.Lock() defer cm.mu.Unlock() - rec, err := cm.load(ctx, id) + rec, err := cm.getRecord(ctx, id) if err != nil { return nil, err } diff --git a/cache/refs.go b/cache/refs.go index a5d050a9..7e983768 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -27,7 +27,6 @@ type ImmutableRef interface { Ref Parent() ImmutableRef Finalize(ctx context.Context) error // Make sure reference is flushed to driver - // Prepare() / ChainID() / Meta() } type MutableRef interface { @@ -40,42 +39,54 @@ type Mountable interface { } type cacheRecord struct { - mu sync.Mutex - mutable bool - refs map[Mountable]struct{} - cm *cacheManager - parent ImmutableRef - md *metadata.StorageItem + cm *cacheManager + mu *sync.Mutex // the mutex is shared by records sharing data + + mutable bool + refs map[Mountable]struct{} + parent ImmutableRef + md *metadata.StorageItem + + // dead means record is marked as deleted + dead bool + view string viewMount []mount.Mount - dead bool sizeG flightcontrol.Group - // size int64 // these are filled if multiple refs point to same data equalMutable *mutableRef equalImmutable *immutableRef } -// hold manager lock before calling +// hold ref lock before calling func (cr *cacheRecord) ref() *immutableRef { ref := &immutableRef{cacheRecord: cr} cr.refs[ref] = struct{}{} return ref } -// hold manager lock before calling +// hold ref lock before calling func (cr *cacheRecord) mref() *mutableRef { ref := &mutableRef{cacheRecord: cr} cr.refs[ref] = struct{}{} return ref } +// hold ref lock before calling +func (cr *cacheRecord) isDead() bool { + return cr.dead || (cr.equalImmutable != nil && cr.equalImmutable.dead) || (cr.equalMutable != nil && cr.equalMutable.dead) +} + func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { // this expects that usage() is implemented lazily s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) { cr.mu.Lock() + if cr.isDead() { + cr.mu.Unlock() + return 0, nil + } s := getSize(cr.md) if s != sizeUnknown { cr.mu.Unlock() @@ -105,7 +116,10 @@ func (cr *cacheRecord) Parent() ImmutableRef { if cr.parent == nil { return nil } - return cr.parent.(*immutableRef).ref() + p := cr.parent.(*immutableRef) + p.mu.Lock() + p.mu.Unlock() + return p.ref() } func (cr *cacheRecord) Mount(ctx context.Context, readonly bool) ([]mount.Mount, error) { @@ -245,13 +259,14 @@ func (cr *cacheRecord) finalize(ctx context.Context) error { func (sr *mutableRef) commit(ctx context.Context) (ImmutableRef, error) { if !sr.mutable || len(sr.refs) == 0 { - return nil, errors.Wrapf(errInvalid, "invalid mutable") + return nil, errors.Wrapf(errInvalid, "invalid mutable ref") } id := identity.NewID() md, _ := sr.cm.md.Get(id) rec := &cacheRecord{ + mu: sr.mu, cm: sr.cm, parent: sr.parent, equalMutable: sr, From 63ce643468c2ce0afab2361d85bb63e82f883697 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 26 Dec 2017 17:22:50 -0800 Subject: [PATCH 2/8] cache: add prune support Signed-off-by: Tonis Tiigi --- api/services/control/control.pb.go | 325 +++++++++++++++++++++-------- api/services/control/control.proto | 5 + cache/gc.go | 4 - cache/manager.go | 119 ++++++++++- cache/manager_test.go | 133 ++++++++++++ cache/metadata.go | 32 ++- cache/refs.go | 20 +- client/diskusage.go | 2 +- client/prune.go | 50 +++++ cmd/buildctl/diskusage.go | 20 +- cmd/buildctl/main.go | 1 + cmd/buildctl/prune.go | 68 ++++++ control/control.go | 47 +++++ worker/base/worker.go | 4 + worker/runc/runc.go | 48 ++++- worker/worker.go | 1 + 16 files changed, 758 insertions(+), 121 deletions(-) create mode 100644 client/prune.go create mode 100644 cmd/buildctl/prune.go diff --git a/api/services/control/control.pb.go b/api/services/control/control.pb.go index 2c1dc381..77475159 100644 --- a/api/services/control/control.pb.go +++ b/api/services/control/control.pb.go @@ -1,5 +1,6 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. +// Code generated by protoc-gen-gogo. // source: control.proto +// DO NOT EDIT! /* Package moby_buildkit_v1 is a generated protocol buffer package. @@ -8,6 +9,7 @@ control.proto It has these top-level messages: + PruneRequest DiskUsageRequest DiskUsageResponse UsageRecord @@ -57,6 +59,14 @@ var _ = time.Kitchen // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package +type PruneRequest struct { +} + +func (m *PruneRequest) Reset() { *m = PruneRequest{} } +func (m *PruneRequest) String() string { return proto.CompactTextString(m) } +func (*PruneRequest) ProtoMessage() {} +func (*PruneRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{0} } + type DiskUsageRequest struct { Filter string `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"` } @@ -64,7 +74,7 @@ type DiskUsageRequest struct { func (m *DiskUsageRequest) Reset() { *m = DiskUsageRequest{} } func (m *DiskUsageRequest) String() string { return proto.CompactTextString(m) } func (*DiskUsageRequest) ProtoMessage() {} -func (*DiskUsageRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{0} } +func (*DiskUsageRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{1} } func (m *DiskUsageRequest) GetFilter() string { if m != nil { @@ -80,7 +90,7 @@ type DiskUsageResponse struct { func (m *DiskUsageResponse) Reset() { *m = DiskUsageResponse{} } func (m *DiskUsageResponse) String() string { return proto.CompactTextString(m) } func (*DiskUsageResponse) ProtoMessage() {} -func (*DiskUsageResponse) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{1} } +func (*DiskUsageResponse) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{2} } func (m *DiskUsageResponse) GetRecord() []*UsageRecord { if m != nil { @@ -104,7 +114,7 @@ type UsageRecord struct { func (m *UsageRecord) Reset() { *m = UsageRecord{} } func (m *UsageRecord) String() string { return proto.CompactTextString(m) } func (*UsageRecord) ProtoMessage() {} -func (*UsageRecord) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{2} } +func (*UsageRecord) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{3} } func (m *UsageRecord) GetID() string { if m != nil { @@ -183,7 +193,7 @@ type SolveRequest struct { func (m *SolveRequest) Reset() { *m = SolveRequest{} } func (m *SolveRequest) String() string { return proto.CompactTextString(m) } func (*SolveRequest) ProtoMessage() {} -func (*SolveRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{3} } +func (*SolveRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{4} } func (m *SolveRequest) GetRef() string { if m != nil { @@ -249,7 +259,7 @@ type CacheOptions struct { func (m *CacheOptions) Reset() { *m = CacheOptions{} } func (m *CacheOptions) String() string { return proto.CompactTextString(m) } func (*CacheOptions) ProtoMessage() {} -func (*CacheOptions) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{4} } +func (*CacheOptions) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{5} } func (m *CacheOptions) GetExportRef() string { if m != nil { @@ -272,7 +282,7 @@ type SolveResponse struct { func (m *SolveResponse) Reset() { *m = SolveResponse{} } func (m *SolveResponse) String() string { return proto.CompactTextString(m) } func (*SolveResponse) ProtoMessage() {} -func (*SolveResponse) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{5} } +func (*SolveResponse) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{6} } func (m *SolveResponse) GetVtx() []*Vertex { if m != nil { @@ -288,7 +298,7 @@ type StatusRequest struct { func (m *StatusRequest) Reset() { *m = StatusRequest{} } func (m *StatusRequest) String() string { return proto.CompactTextString(m) } func (*StatusRequest) ProtoMessage() {} -func (*StatusRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{6} } +func (*StatusRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{7} } func (m *StatusRequest) GetRef() string { if m != nil { @@ -306,7 +316,7 @@ type StatusResponse struct { func (m *StatusResponse) Reset() { *m = StatusResponse{} } func (m *StatusResponse) String() string { return proto.CompactTextString(m) } func (*StatusResponse) ProtoMessage() {} -func (*StatusResponse) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{7} } +func (*StatusResponse) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{8} } func (m *StatusResponse) GetVertexes() []*Vertex { if m != nil { @@ -342,7 +352,7 @@ type Vertex struct { func (m *Vertex) Reset() { *m = Vertex{} } func (m *Vertex) String() string { return proto.CompactTextString(m) } func (*Vertex) ProtoMessage() {} -func (*Vertex) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{8} } +func (*Vertex) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{9} } func (m *Vertex) GetName() string { if m != nil { @@ -394,7 +404,7 @@ type VertexStatus struct { func (m *VertexStatus) Reset() { *m = VertexStatus{} } func (m *VertexStatus) String() string { return proto.CompactTextString(m) } func (*VertexStatus) ProtoMessage() {} -func (*VertexStatus) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{9} } +func (*VertexStatus) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{10} } func (m *VertexStatus) GetID() string { if m != nil { @@ -455,7 +465,7 @@ type VertexLog struct { func (m *VertexLog) Reset() { *m = VertexLog{} } func (m *VertexLog) String() string { return proto.CompactTextString(m) } func (*VertexLog) ProtoMessage() {} -func (*VertexLog) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{10} } +func (*VertexLog) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{11} } func (m *VertexLog) GetTimestamp() time.Time { if m != nil { @@ -485,7 +495,7 @@ type BytesMessage struct { func (m *BytesMessage) Reset() { *m = BytesMessage{} } func (m *BytesMessage) String() string { return proto.CompactTextString(m) } func (*BytesMessage) ProtoMessage() {} -func (*BytesMessage) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{11} } +func (*BytesMessage) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{12} } func (m *BytesMessage) GetData() []byte { if m != nil { @@ -501,7 +511,7 @@ type ListWorkersRequest struct { func (m *ListWorkersRequest) Reset() { *m = ListWorkersRequest{} } func (m *ListWorkersRequest) String() string { return proto.CompactTextString(m) } func (*ListWorkersRequest) ProtoMessage() {} -func (*ListWorkersRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{12} } +func (*ListWorkersRequest) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{13} } func (m *ListWorkersRequest) GetFilter() []string { if m != nil { @@ -517,7 +527,7 @@ type ListWorkersResponse struct { func (m *ListWorkersResponse) Reset() { *m = ListWorkersResponse{} } func (m *ListWorkersResponse) String() string { return proto.CompactTextString(m) } func (*ListWorkersResponse) ProtoMessage() {} -func (*ListWorkersResponse) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{13} } +func (*ListWorkersResponse) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{14} } func (m *ListWorkersResponse) GetRecord() []*WorkerRecord { if m != nil { @@ -534,7 +544,7 @@ type WorkerRecord struct { func (m *WorkerRecord) Reset() { *m = WorkerRecord{} } func (m *WorkerRecord) String() string { return proto.CompactTextString(m) } func (*WorkerRecord) ProtoMessage() {} -func (*WorkerRecord) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{14} } +func (*WorkerRecord) Descriptor() ([]byte, []int) { return fileDescriptorControl, []int{15} } func (m *WorkerRecord) GetID() string { if m != nil { @@ -551,6 +561,7 @@ func (m *WorkerRecord) GetLabels() map[string]string { } func init() { + proto.RegisterType((*PruneRequest)(nil), "moby.buildkit.v1.PruneRequest") proto.RegisterType((*DiskUsageRequest)(nil), "moby.buildkit.v1.DiskUsageRequest") proto.RegisterType((*DiskUsageResponse)(nil), "moby.buildkit.v1.DiskUsageResponse") proto.RegisterType((*UsageRecord)(nil), "moby.buildkit.v1.UsageRecord") @@ -580,6 +591,7 @@ const _ = grpc.SupportPackageIsVersion4 type ControlClient interface { DiskUsage(ctx context.Context, in *DiskUsageRequest, opts ...grpc.CallOption) (*DiskUsageResponse, error) + Prune(ctx context.Context, in *PruneRequest, opts ...grpc.CallOption) (Control_PruneClient, error) Solve(ctx context.Context, in *SolveRequest, opts ...grpc.CallOption) (*SolveResponse, error) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (Control_StatusClient, error) Session(ctx context.Context, opts ...grpc.CallOption) (Control_SessionClient, error) @@ -603,6 +615,38 @@ func (c *controlClient) DiskUsage(ctx context.Context, in *DiskUsageRequest, opt return out, nil } +func (c *controlClient) Prune(ctx context.Context, in *PruneRequest, opts ...grpc.CallOption) (Control_PruneClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Control_serviceDesc.Streams[0], c.cc, "/moby.buildkit.v1.Control/Prune", opts...) + if err != nil { + return nil, err + } + x := &controlPruneClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Control_PruneClient interface { + Recv() (*UsageRecord, error) + grpc.ClientStream +} + +type controlPruneClient struct { + grpc.ClientStream +} + +func (x *controlPruneClient) Recv() (*UsageRecord, error) { + m := new(UsageRecord) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *controlClient) Solve(ctx context.Context, in *SolveRequest, opts ...grpc.CallOption) (*SolveResponse, error) { out := new(SolveResponse) err := grpc.Invoke(ctx, "/moby.buildkit.v1.Control/Solve", in, out, c.cc, opts...) @@ -613,7 +657,7 @@ func (c *controlClient) Solve(ctx context.Context, in *SolveRequest, opts ...grp } func (c *controlClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (Control_StatusClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Control_serviceDesc.Streams[0], c.cc, "/moby.buildkit.v1.Control/Status", opts...) + stream, err := grpc.NewClientStream(ctx, &_Control_serviceDesc.Streams[1], c.cc, "/moby.buildkit.v1.Control/Status", opts...) if err != nil { return nil, err } @@ -645,7 +689,7 @@ func (x *controlStatusClient) Recv() (*StatusResponse, error) { } func (c *controlClient) Session(ctx context.Context, opts ...grpc.CallOption) (Control_SessionClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Control_serviceDesc.Streams[1], c.cc, "/moby.buildkit.v1.Control/Session", opts...) + stream, err := grpc.NewClientStream(ctx, &_Control_serviceDesc.Streams[2], c.cc, "/moby.buildkit.v1.Control/Session", opts...) if err != nil { return nil, err } @@ -688,6 +732,7 @@ func (c *controlClient) ListWorkers(ctx context.Context, in *ListWorkersRequest, type ControlServer interface { DiskUsage(context.Context, *DiskUsageRequest) (*DiskUsageResponse, error) + Prune(*PruneRequest, Control_PruneServer) error Solve(context.Context, *SolveRequest) (*SolveResponse, error) Status(*StatusRequest, Control_StatusServer) error Session(Control_SessionServer) error @@ -716,6 +761,27 @@ func _Control_DiskUsage_Handler(srv interface{}, ctx context.Context, dec func(i return interceptor(ctx, in, info, handler) } +func _Control_Prune_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(PruneRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ControlServer).Prune(m, &controlPruneServer{stream}) +} + +type Control_PruneServer interface { + Send(*UsageRecord) error + grpc.ServerStream +} + +type controlPruneServer struct { + grpc.ServerStream +} + +func (x *controlPruneServer) Send(m *UsageRecord) error { + return x.ServerStream.SendMsg(m) +} + func _Control_Solve_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(SolveRequest) if err := dec(in); err != nil { @@ -817,6 +883,11 @@ var _Control_serviceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{ + { + StreamName: "Prune", + Handler: _Control_Prune_Handler, + ServerStreams: true, + }, { StreamName: "Status", Handler: _Control_Status_Handler, @@ -832,6 +903,24 @@ var _Control_serviceDesc = grpc.ServiceDesc{ Metadata: "control.proto", } +func (m *PruneRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PruneRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + func (m *DiskUsageRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1555,6 +1644,12 @@ func encodeVarintControl(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } +func (m *PruneRequest) Size() (n int) { + var l int + _ = l + return n +} + func (m *DiskUsageRequest) Size() (n int) { var l int _ = l @@ -1870,6 +1965,56 @@ func sovControl(x uint64) (n int) { func sozControl(x uint64) (n int) { return sovControl(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (m *PruneRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowControl + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PruneRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PruneRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipControl(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthControl + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *DiskUsageRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -4406,75 +4551,77 @@ var ( func init() { proto.RegisterFile("control.proto", fileDescriptorControl) } var fileDescriptorControl = []byte{ - // 1120 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xcd, 0x6e, 0x23, 0x45, - 0x10, 0x66, 0x3c, 0xce, 0xd8, 0x2e, 0x3b, 0xab, 0xd0, 0xa0, 0xd5, 0x68, 0x00, 0xc7, 0x0c, 0x20, - 0x59, 0xd1, 0xee, 0x78, 0x37, 0xfc, 0x08, 0x82, 0x84, 0x76, 0x1d, 0x2f, 0x22, 0x51, 0x22, 0x50, - 0x67, 0xc3, 0x4a, 0xdc, 0xc6, 0x76, 0x67, 0x76, 0x94, 0xf1, 0xf4, 0xd0, 0xdd, 0x8e, 0x12, 0x9e, - 0x82, 0x03, 0x57, 0x4e, 0x3c, 0x02, 0x4f, 0xc0, 0x01, 0x69, 0x8f, 0x9c, 0x39, 0x04, 0x94, 0x07, - 0xe0, 0x19, 0x50, 0xff, 0x8c, 0x3d, 0xce, 0xe4, 0x7f, 0x4f, 0xd3, 0x55, 0x5d, 0xf5, 0x75, 0x75, - 0x7d, 0x35, 0x55, 0x0d, 0xcb, 0x23, 0x9a, 0x0a, 0x46, 0x93, 0x20, 0x63, 0x54, 0x50, 0xb4, 0x32, - 0xa1, 0xc3, 0x93, 0x60, 0x38, 0x8d, 0x93, 0xf1, 0x61, 0x2c, 0x82, 0xa3, 0xc7, 0xde, 0xc3, 0x28, - 0x16, 0x2f, 0xa7, 0xc3, 0x60, 0x44, 0x27, 0xbd, 0x88, 0x46, 0xb4, 0xa7, 0x0c, 0x87, 0xd3, 0x03, - 0x25, 0x29, 0x41, 0xad, 0x34, 0x80, 0xb7, 0x1a, 0x51, 0x1a, 0x25, 0x64, 0x6e, 0x25, 0xe2, 0x09, - 0xe1, 0x22, 0x9c, 0x64, 0xc6, 0xe0, 0x41, 0x01, 0x4f, 0x1e, 0xd6, 0xcb, 0x0f, 0xeb, 0x71, 0x9a, - 0x1c, 0x11, 0xd6, 0xcb, 0x86, 0x3d, 0x9a, 0x71, 0x6d, 0xed, 0xaf, 0xc1, 0xca, 0x20, 0xe6, 0x87, - 0xfb, 0x3c, 0x8c, 0x08, 0x26, 0x3f, 0x4e, 0x09, 0x17, 0xe8, 0x3e, 0x38, 0x07, 0x71, 0x22, 0x08, - 0x73, 0xad, 0x8e, 0xd5, 0x6d, 0x60, 0x23, 0xf9, 0xdb, 0xf0, 0x66, 0xc1, 0x96, 0x67, 0x34, 0xe5, - 0x04, 0x7d, 0x0a, 0x0e, 0x23, 0x23, 0xca, 0xc6, 0xae, 0xd5, 0xb1, 0xbb, 0xcd, 0xf5, 0xf7, 0x82, - 0xf3, 0x37, 0x0c, 0x8c, 0x83, 0x34, 0xc2, 0xc6, 0xd8, 0xff, 0xa3, 0x02, 0xcd, 0x82, 0x1e, 0xdd, - 0x83, 0xca, 0xd6, 0xc0, 0x9c, 0x57, 0xd9, 0x1a, 0x20, 0x17, 0x6a, 0xbb, 0x53, 0x11, 0x0e, 0x13, - 0xe2, 0x56, 0x3a, 0x56, 0xb7, 0x8e, 0x73, 0x11, 0xbd, 0x0d, 0x4b, 0x5b, 0xe9, 0x3e, 0x27, 0xae, - 0xad, 0xf4, 0x5a, 0x40, 0x08, 0xaa, 0x7b, 0xf1, 0x4f, 0xc4, 0xad, 0x76, 0xac, 0xae, 0x8d, 0xd5, - 0x5a, 0xde, 0xe3, 0xbb, 0x90, 0x91, 0x54, 0xb8, 0x4b, 0xfa, 0x1e, 0x5a, 0x42, 0x7d, 0x68, 0x6c, - 0x32, 0x12, 0x0a, 0x32, 0x7e, 0x2a, 0x5c, 0xa7, 0x63, 0x75, 0x9b, 0xeb, 0x5e, 0xa0, 0xd3, 0x1a, - 0xe4, 0x69, 0x0d, 0x9e, 0xe7, 0x69, 0xed, 0xd7, 0x5f, 0x9d, 0xae, 0xbe, 0xf1, 0xf3, 0x3f, 0xab, - 0x16, 0x9e, 0xbb, 0xa1, 0x27, 0x00, 0x3b, 0x21, 0x17, 0xfb, 0x5c, 0x81, 0xd4, 0xae, 0x05, 0xa9, - 0x2a, 0x80, 0x82, 0x0f, 0x6a, 0x03, 0xa8, 0x04, 0x6c, 0xd2, 0x69, 0x2a, 0xdc, 0xba, 0x8a, 0xbb, - 0xa0, 0x41, 0x1d, 0x68, 0x0e, 0x08, 0x1f, 0xb1, 0x38, 0x13, 0x31, 0x4d, 0xdd, 0x86, 0xba, 0x42, - 0x51, 0xe5, 0xff, 0x52, 0x85, 0xd6, 0x9e, 0xe4, 0x34, 0x27, 0x6e, 0x05, 0x6c, 0x4c, 0x0e, 0x4c, - 0x16, 0xe5, 0x12, 0x05, 0x00, 0x03, 0x72, 0x10, 0xa7, 0xb1, 0xc2, 0xa8, 0xa8, 0x30, 0xef, 0x05, - 0xd9, 0x30, 0x98, 0x6b, 0x71, 0xc1, 0x02, 0x79, 0x50, 0x7f, 0x76, 0x9c, 0x51, 0x26, 0xc9, 0xb7, - 0x15, 0xcc, 0x4c, 0x46, 0x2f, 0x60, 0x39, 0x5f, 0x3f, 0x15, 0x82, 0x71, 0xb7, 0xaa, 0x08, 0x7f, - 0x5c, 0x26, 0xbc, 0x18, 0x54, 0xb0, 0xe0, 0xf3, 0x2c, 0x15, 0xec, 0x04, 0x2f, 0xe2, 0x48, 0xae, - 0xf7, 0x08, 0xe7, 0x32, 0x42, 0x4d, 0x54, 0x2e, 0xca, 0x70, 0xbe, 0x66, 0x34, 0x15, 0x24, 0x1d, - 0x2b, 0xa2, 0x1a, 0x78, 0x26, 0xcb, 0x70, 0xf2, 0xb5, 0x0e, 0xa7, 0x76, 0xa3, 0x70, 0x16, 0x7c, - 0x4c, 0x38, 0x0b, 0x3a, 0xb4, 0x01, 0x4b, 0x9b, 0xe1, 0xe8, 0x25, 0x51, 0x9c, 0x34, 0xd7, 0xdb, - 0x65, 0x40, 0xb5, 0xfd, 0xad, 0x22, 0x81, 0xf7, 0xab, 0xb2, 0x3c, 0xb0, 0x76, 0xf1, 0x9e, 0x00, - 0x2a, 0xdf, 0x57, 0xf2, 0x72, 0x48, 0x4e, 0x72, 0x5e, 0x0e, 0xc9, 0x89, 0x2c, 0xe2, 0xa3, 0x30, - 0x99, 0xea, 0xe2, 0x6e, 0x60, 0x2d, 0x6c, 0x54, 0x3e, 0xb7, 0x24, 0x42, 0x39, 0xc4, 0xdb, 0x20, - 0xf8, 0xdb, 0xd0, 0x2a, 0x06, 0x88, 0xde, 0x85, 0x86, 0x8e, 0x69, 0x5e, 0x1b, 0x73, 0x85, 0xdc, - 0xdd, 0x9a, 0xe4, 0xbb, 0x1a, 0x6b, 0xae, 0xf0, 0xbf, 0x84, 0x65, 0x93, 0x3d, 0xf3, 0xbb, 0xaf, - 0x81, 0x7d, 0x24, 0x8e, 0xcd, 0xbf, 0xee, 0x96, 0x53, 0xf3, 0x3d, 0x61, 0x82, 0x1c, 0x63, 0x69, - 0xe4, 0xbf, 0x0f, 0xcb, 0x7b, 0x22, 0x14, 0x53, 0x7e, 0x69, 0x7d, 0xfa, 0xbf, 0x5b, 0x70, 0x2f, - 0xb7, 0x31, 0x27, 0x7c, 0x02, 0xf5, 0x23, 0x05, 0x42, 0xf8, 0xb5, 0xc7, 0xcc, 0x2c, 0xd1, 0x06, - 0xd4, 0xb9, 0xc2, 0x21, 0xdc, 0xad, 0x28, 0xaf, 0xf6, 0x65, 0x5e, 0xe6, 0xbc, 0x99, 0x3d, 0xea, - 0x41, 0x35, 0xa1, 0x11, 0x77, 0x6d, 0xe5, 0xf7, 0xce, 0x65, 0x7e, 0x3b, 0x34, 0xc2, 0xca, 0xd0, - 0x3f, 0xad, 0x80, 0xa3, 0x75, 0x68, 0x1b, 0x9c, 0x71, 0x1c, 0x11, 0x2e, 0xf4, 0xad, 0xfa, 0xeb, - 0xb2, 0x1a, 0xfe, 0x3e, 0x5d, 0x5d, 0x2b, 0x74, 0x61, 0x9a, 0x91, 0x54, 0x4e, 0x81, 0x30, 0x4e, - 0x09, 0xe3, 0xbd, 0x88, 0x3e, 0xd4, 0x2e, 0xc1, 0x40, 0x7d, 0xb0, 0x41, 0x90, 0x58, 0x71, 0x9a, - 0x4d, 0x85, 0xbe, 0xc1, 0x1d, 0xb1, 0x34, 0x82, 0xec, 0x87, 0x69, 0x38, 0x21, 0xe6, 0x27, 0x56, - 0x6b, 0xd9, 0x0f, 0x47, 0xb2, 0x30, 0xc6, 0xaa, 0x4b, 0xd6, 0xb1, 0x91, 0xd0, 0x06, 0xd4, 0xb8, - 0x08, 0x99, 0x20, 0x63, 0xf5, 0xff, 0xdd, 0xa4, 0x91, 0xe5, 0x0e, 0xe8, 0x2b, 0x68, 0x8c, 0xe8, - 0x24, 0x4b, 0x88, 0xf4, 0x76, 0x6e, 0xe8, 0x3d, 0x77, 0x91, 0x65, 0x4c, 0x18, 0xa3, 0x4c, 0xb5, - 0xd0, 0x06, 0xd6, 0x82, 0xff, 0x5f, 0x05, 0x5a, 0x45, 0xb2, 0x4a, 0xe3, 0x61, 0x1b, 0x1c, 0x4d, - 0xbd, 0x2e, 0xd9, 0xbb, 0xa5, 0x4a, 0x23, 0x5c, 0x98, 0x2a, 0x17, 0x6a, 0xa3, 0x29, 0x53, 0xb3, - 0x43, 0x4f, 0x94, 0x5c, 0x94, 0x01, 0x0b, 0x2a, 0xc2, 0x44, 0xa5, 0xca, 0xc6, 0x5a, 0x90, 0x23, - 0x65, 0x36, 0x87, 0x6f, 0x37, 0x52, 0x66, 0x6e, 0x45, 0x1a, 0x6a, 0xaf, 0x45, 0x43, 0xfd, 0xd6, - 0x34, 0xf8, 0x7f, 0x5a, 0xd0, 0x98, 0x55, 0x79, 0x21, 0xbb, 0xd6, 0x6b, 0x67, 0x77, 0x21, 0x33, - 0x95, 0xbb, 0x65, 0xe6, 0x3e, 0x38, 0x5c, 0x30, 0x12, 0x4e, 0x14, 0x47, 0x36, 0x36, 0x92, 0xec, - 0x27, 0x13, 0x1e, 0x29, 0x86, 0x5a, 0x58, 0x2e, 0x7d, 0x1f, 0x5a, 0xfd, 0x13, 0x41, 0xf8, 0x2e, - 0xe1, 0x72, 0x92, 0x4a, 0x6e, 0xc7, 0xa1, 0x08, 0xd5, 0x3d, 0x5a, 0x58, 0xad, 0xfd, 0x07, 0x80, - 0x76, 0x62, 0x2e, 0x5e, 0x50, 0x76, 0x48, 0x18, 0xbf, 0xe8, 0xd1, 0x63, 0x17, 0x1e, 0x3d, 0xbb, - 0xf0, 0xd6, 0x82, 0xb5, 0xe9, 0x52, 0x9f, 0x9d, 0x7b, 0xf6, 0x5c, 0xd0, 0x6d, 0xb4, 0xcb, 0xb9, - 0x77, 0xcf, 0xaf, 0x16, 0xb4, 0x8a, 0x1b, 0xa5, 0xca, 0xee, 0x83, 0xb3, 0x13, 0x0e, 0x49, 0x92, - 0xb7, 0xb1, 0xb5, 0xab, 0x81, 0x03, 0x6d, 0xac, 0x07, 0x99, 0xf1, 0xf4, 0xbe, 0x80, 0x66, 0x41, - 0x7d, 0x9b, 0xe1, 0xb1, 0xfe, 0x9b, 0x0d, 0xb5, 0x4d, 0xfd, 0x62, 0x45, 0xcf, 0xa1, 0x31, 0x7b, - 0xef, 0x21, 0xbf, 0x1c, 0xc7, 0xf9, 0x87, 0xa3, 0xf7, 0xc1, 0x95, 0x36, 0x26, 0x73, 0xdf, 0xc0, - 0x92, 0x1a, 0x29, 0xa8, 0x7d, 0xf5, 0xa4, 0xf6, 0x56, 0x2f, 0xdd, 0x37, 0x48, 0xbb, 0xe0, 0x98, - 0xf6, 0x70, 0x91, 0x69, 0x71, 0xf2, 0x78, 0x9d, 0xcb, 0x0d, 0x34, 0xd8, 0x23, 0x0b, 0xed, 0xce, - 0x9e, 0x21, 0x17, 0x85, 0x56, 0x2c, 0x2b, 0xef, 0x9a, 0xfd, 0xae, 0xf5, 0xc8, 0x42, 0x3f, 0x40, - 0xb3, 0x50, 0x38, 0xe8, 0xc3, 0xb2, 0x4b, 0xb9, 0x0a, 0xbd, 0x8f, 0xae, 0xb1, 0xd2, 0xc1, 0xf6, - 0x5b, 0xaf, 0xce, 0xda, 0xd6, 0x5f, 0x67, 0x6d, 0xeb, 0xdf, 0xb3, 0xb6, 0x35, 0x74, 0xd4, 0x7f, - 0xf4, 0xf1, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff, 0x54, 0x2e, 0xae, 0xbd, 0x6b, 0x0c, 0x00, 0x00, + // 1144 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x56, 0xcd, 0x6e, 0x23, 0xc5, + 0x13, 0xff, 0x8f, 0xed, 0x8c, 0xed, 0xb2, 0x13, 0xe5, 0xdf, 0xa0, 0xd5, 0x68, 0x00, 0xc7, 0x0c, + 0x20, 0x59, 0xd1, 0xee, 0x38, 0x1b, 0x3e, 0x04, 0x41, 0x42, 0xbb, 0x8e, 0x17, 0x91, 0x28, 0x11, + 0xab, 0xc9, 0x86, 0x95, 0xb8, 0x8d, 0xed, 0xce, 0xec, 0x28, 0xe3, 0xe9, 0xa1, 0xbb, 0x27, 0x4a, + 0x78, 0x0a, 0x0e, 0x5c, 0x79, 0x0a, 0x9e, 0x80, 0x03, 0xd2, 0x1e, 0x39, 0x73, 0x08, 0x28, 0x77, + 0x78, 0x06, 0xd4, 0x1f, 0x63, 0xb7, 0x63, 0xe7, 0x73, 0x4f, 0xd3, 0xd5, 0x5d, 0xf5, 0xeb, 0xea, + 0xfa, 0xd5, 0x54, 0x15, 0x2c, 0x0f, 0x49, 0xca, 0x29, 0x49, 0xfc, 0x8c, 0x12, 0x4e, 0xd0, 0xea, + 0x98, 0x0c, 0xce, 0xfc, 0x41, 0x1e, 0x27, 0xa3, 0xe3, 0x98, 0xfb, 0x27, 0x8f, 0xdd, 0x47, 0x51, + 0xcc, 0x5f, 0xe5, 0x03, 0x7f, 0x48, 0xc6, 0xdd, 0x88, 0x44, 0xa4, 0x2b, 0x15, 0x07, 0xf9, 0x91, + 0x94, 0xa4, 0x20, 0x57, 0x0a, 0xc0, 0x5d, 0x8b, 0x08, 0x89, 0x12, 0x3c, 0xd5, 0xe2, 0xf1, 0x18, + 0x33, 0x1e, 0x8e, 0x33, 0xad, 0xf0, 0xd0, 0xc0, 0x13, 0x97, 0x75, 0x8b, 0xcb, 0xba, 0x8c, 0x24, + 0x27, 0x98, 0x76, 0xb3, 0x41, 0x97, 0x64, 0x4c, 0x69, 0x7b, 0x2b, 0xd0, 0x7c, 0x4e, 0xf3, 0x14, + 0x07, 0xf8, 0x87, 0x1c, 0x33, 0xee, 0xad, 0xc3, 0x6a, 0x3f, 0x66, 0xc7, 0x87, 0x2c, 0x8c, 0x8a, + 0x3d, 0xf4, 0x00, 0xec, 0xa3, 0x38, 0xe1, 0x98, 0x3a, 0x56, 0xdb, 0xea, 0xd4, 0x03, 0x2d, 0x79, + 0xbb, 0xf0, 0x7f, 0x43, 0x97, 0x65, 0x24, 0x65, 0x18, 0x7d, 0x0a, 0x36, 0xc5, 0x43, 0x42, 0x47, + 0x8e, 0xd5, 0x2e, 0x77, 0x1a, 0x9b, 0xef, 0xf9, 0x97, 0x5f, 0xec, 0x6b, 0x03, 0xa1, 0x14, 0x68, + 0x65, 0xef, 0xb7, 0x12, 0x34, 0x8c, 0x7d, 0xb4, 0x02, 0xa5, 0x9d, 0xbe, 0xbe, 0xaf, 0xb4, 0xd3, + 0x47, 0x0e, 0x54, 0xf7, 0x73, 0x1e, 0x0e, 0x12, 0xec, 0x94, 0xda, 0x56, 0xa7, 0x16, 0x14, 0x22, + 0x7a, 0x1b, 0x96, 0x76, 0xd2, 0x43, 0x86, 0x9d, 0xb2, 0xdc, 0x57, 0x02, 0x42, 0x50, 0x39, 0x88, + 0x7f, 0xc4, 0x4e, 0xa5, 0x6d, 0x75, 0xca, 0x81, 0x5c, 0x8b, 0x77, 0x3c, 0x0f, 0x29, 0x4e, 0xb9, + 0xb3, 0xa4, 0xde, 0xa1, 0x24, 0xd4, 0x83, 0xfa, 0x36, 0xc5, 0x21, 0xc7, 0xa3, 0xa7, 0xdc, 0xb1, + 0xdb, 0x56, 0xa7, 0xb1, 0xe9, 0xfa, 0x2a, 0xcc, 0x7e, 0x11, 0x66, 0xff, 0x45, 0x11, 0xe6, 0x5e, + 0xed, 0xf5, 0xf9, 0xda, 0xff, 0x7e, 0xfa, 0x6b, 0xcd, 0x0a, 0xa6, 0x66, 0xe8, 0x09, 0xc0, 0x5e, + 0xc8, 0xf8, 0x21, 0x93, 0x20, 0xd5, 0x1b, 0x41, 0x2a, 0x12, 0xc0, 0xb0, 0x41, 0x2d, 0x00, 0x19, + 0x80, 0x6d, 0x92, 0xa7, 0xdc, 0xa9, 0x49, 0xbf, 0x8d, 0x1d, 0xd4, 0x86, 0x46, 0x1f, 0xb3, 0x21, + 0x8d, 0x33, 0x1e, 0x93, 0xd4, 0xa9, 0xcb, 0x27, 0x98, 0x5b, 0xde, 0xcf, 0x15, 0x68, 0x1e, 0x08, + 0x8e, 0x0b, 0xe2, 0x56, 0xa1, 0x1c, 0xe0, 0x23, 0x1d, 0x45, 0xb1, 0x44, 0x3e, 0x40, 0x1f, 0x1f, + 0xc5, 0x69, 0x2c, 0x31, 0x4a, 0xd2, 0xcd, 0x15, 0x3f, 0x1b, 0xf8, 0xd3, 0xdd, 0xc0, 0xd0, 0x40, + 0x2e, 0xd4, 0x9e, 0x9d, 0x66, 0x84, 0x0a, 0xf2, 0xcb, 0x12, 0x66, 0x22, 0xa3, 0x97, 0xb0, 0x5c, + 0xac, 0x9f, 0x72, 0x4e, 0x99, 0x53, 0x91, 0x84, 0x3f, 0x9e, 0x27, 0xdc, 0x74, 0xca, 0x9f, 0xb1, + 0x79, 0x96, 0x72, 0x7a, 0x16, 0xcc, 0xe2, 0x08, 0xae, 0x0f, 0x30, 0x63, 0xc2, 0x43, 0x45, 0x54, + 0x21, 0x0a, 0x77, 0xbe, 0xa6, 0x24, 0xe5, 0x38, 0x1d, 0x49, 0xa2, 0xea, 0xc1, 0x44, 0x16, 0xee, + 0x14, 0x6b, 0xe5, 0x4e, 0xf5, 0x56, 0xee, 0xcc, 0xd8, 0x68, 0x77, 0x66, 0xf6, 0xd0, 0x16, 0x2c, + 0x6d, 0x87, 0xc3, 0x57, 0x58, 0x72, 0xd2, 0xd8, 0x6c, 0xcd, 0x03, 0xca, 0xe3, 0x6f, 0x25, 0x09, + 0xac, 0x57, 0x11, 0xe9, 0x11, 0x28, 0x13, 0xf7, 0x09, 0xa0, 0xf9, 0xf7, 0x0a, 0x5e, 0x8e, 0xf1, + 0x59, 0xc1, 0xcb, 0x31, 0x3e, 0x13, 0x49, 0x7c, 0x12, 0x26, 0xb9, 0x4a, 0xee, 0x7a, 0xa0, 0x84, + 0xad, 0xd2, 0xe7, 0x96, 0x40, 0x98, 0x77, 0xf1, 0x2e, 0x08, 0xde, 0x2e, 0x34, 0x4d, 0x07, 0xd1, + 0xbb, 0x50, 0x57, 0x3e, 0x4d, 0x73, 0x63, 0xba, 0x21, 0x4e, 0x77, 0xc6, 0xc5, 0xa9, 0xc2, 0x9a, + 0x6e, 0x78, 0x5f, 0xc2, 0xb2, 0x8e, 0x9e, 0xfe, 0xdd, 0xd7, 0xa1, 0x7c, 0xc2, 0x4f, 0xf5, 0xbf, + 0xee, 0xcc, 0x87, 0xe6, 0x3b, 0x4c, 0x39, 0x3e, 0x0d, 0x84, 0x92, 0xf7, 0x3e, 0x2c, 0x1f, 0xf0, + 0x90, 0xe7, 0xec, 0xca, 0xfc, 0xf4, 0x7e, 0xb5, 0x60, 0xa5, 0xd0, 0xd1, 0x37, 0x7c, 0x02, 0xb5, + 0x13, 0x09, 0x82, 0xd9, 0x8d, 0xd7, 0x4c, 0x34, 0xd1, 0x16, 0xd4, 0x98, 0xc4, 0xc1, 0xcc, 0x29, + 0x49, 0xab, 0xd6, 0x55, 0x56, 0xfa, 0xbe, 0x89, 0x3e, 0xea, 0x42, 0x25, 0x21, 0x11, 0x73, 0xca, + 0xd2, 0xee, 0x9d, 0xab, 0xec, 0xf6, 0x48, 0x14, 0x48, 0x45, 0xef, 0xbc, 0x04, 0xb6, 0xda, 0x43, + 0xbb, 0x60, 0x8f, 0xe2, 0x08, 0x33, 0xae, 0x5e, 0xd5, 0xdb, 0x14, 0xd9, 0xf0, 0xe7, 0xf9, 0xda, + 0xba, 0x51, 0x95, 0x49, 0x86, 0x53, 0xd1, 0x15, 0xc2, 0x38, 0xc5, 0x94, 0x75, 0x23, 0xf2, 0x48, + 0x99, 0xf8, 0x7d, 0xf9, 0x09, 0x34, 0x82, 0xc0, 0x8a, 0xd3, 0x2c, 0xe7, 0xea, 0x05, 0xf7, 0xc4, + 0x52, 0x08, 0xa2, 0x1e, 0xa6, 0xe1, 0x18, 0xeb, 0x9f, 0x58, 0xae, 0x45, 0x3d, 0x1c, 0x8a, 0xc4, + 0x18, 0xc9, 0x2a, 0x59, 0x0b, 0xb4, 0x84, 0xb6, 0xa0, 0xca, 0x78, 0x48, 0x39, 0x1e, 0xc9, 0xff, + 0xef, 0x36, 0x85, 0xac, 0x30, 0x40, 0x5f, 0x41, 0x7d, 0x48, 0xc6, 0x59, 0x82, 0x85, 0xb5, 0x7d, + 0x4b, 0xeb, 0xa9, 0x89, 0x48, 0x63, 0x4c, 0x29, 0xa1, 0xb2, 0x84, 0xd6, 0x03, 0x25, 0x78, 0xff, + 0x96, 0xa0, 0x69, 0x92, 0x35, 0xd7, 0x1e, 0x76, 0xc1, 0x56, 0xd4, 0xab, 0x94, 0xbd, 0x5f, 0xa8, + 0x14, 0xc2, 0xc2, 0x50, 0x39, 0x50, 0x1d, 0xe6, 0x54, 0xf6, 0x0e, 0xd5, 0x51, 0x0a, 0x51, 0x38, + 0xcc, 0x09, 0x0f, 0x13, 0x19, 0xaa, 0x72, 0xa0, 0x04, 0xd1, 0x52, 0x26, 0x7d, 0xf9, 0x6e, 0x2d, + 0x65, 0x62, 0x66, 0xd2, 0x50, 0x7d, 0x23, 0x1a, 0x6a, 0x77, 0xa6, 0xc1, 0xfb, 0xdd, 0x82, 0xfa, + 0x24, 0xcb, 0x8d, 0xe8, 0x5a, 0x6f, 0x1c, 0xdd, 0x99, 0xc8, 0x94, 0xee, 0x17, 0x99, 0x07, 0x60, + 0x33, 0x4e, 0x71, 0x38, 0x96, 0x1c, 0x95, 0x03, 0x2d, 0x89, 0x7a, 0x32, 0x66, 0x91, 0x64, 0xa8, + 0x19, 0x88, 0xa5, 0xe7, 0x41, 0xb3, 0x77, 0xc6, 0x31, 0xdb, 0xc7, 0x4c, 0x74, 0x52, 0xc1, 0xed, + 0x28, 0xe4, 0xa1, 0x7c, 0x47, 0x33, 0x90, 0x6b, 0xef, 0x21, 0xa0, 0xbd, 0x98, 0xf1, 0x97, 0x84, + 0x1e, 0x63, 0xca, 0x16, 0x0d, 0x3d, 0x65, 0x63, 0xe8, 0xd9, 0x87, 0xb7, 0x66, 0xb4, 0x75, 0x95, + 0xfa, 0xec, 0xd2, 0xd8, 0xb3, 0xa0, 0xda, 0x28, 0x93, 0x4b, 0x73, 0xcf, 0x2f, 0x16, 0x34, 0xcd, + 0x83, 0xb9, 0xcc, 0xee, 0x81, 0xbd, 0x17, 0x0e, 0x70, 0x52, 0x94, 0xb1, 0xf5, 0xeb, 0x81, 0x7d, + 0xa5, 0xac, 0x1a, 0x99, 0xb6, 0x74, 0xbf, 0x80, 0x86, 0xb1, 0x7d, 0x97, 0xe6, 0xb1, 0xf9, 0x4f, + 0x19, 0xaa, 0xdb, 0x6a, 0x82, 0x45, 0x2f, 0xa0, 0x3e, 0x99, 0xf7, 0x90, 0x37, 0xef, 0xc7, 0xe5, + 0xc1, 0xd1, 0xfd, 0xe0, 0x5a, 0x1d, 0x1d, 0xb9, 0x6f, 0x60, 0x49, 0x4e, 0xa0, 0x68, 0x41, 0xc8, + 0xcc, 0xd1, 0xd4, 0xbd, 0x7e, 0x92, 0xdc, 0xb0, 0x04, 0x92, 0x6c, 0x4e, 0x8b, 0x90, 0xcc, 0x9e, + 0xef, 0xae, 0x5d, 0x79, 0xae, 0x7d, 0xda, 0x07, 0x5b, 0x17, 0x9a, 0x45, 0xaa, 0x66, 0x0f, 0x73, + 0xdb, 0x57, 0x2b, 0x28, 0xb0, 0x0d, 0x0b, 0xed, 0x4f, 0x06, 0x9a, 0x45, 0xae, 0x99, 0x09, 0xea, + 0xde, 0x70, 0xde, 0xb1, 0x36, 0x2c, 0xf4, 0x3d, 0x34, 0x8c, 0x14, 0x44, 0x1f, 0xce, 0x9b, 0xcc, + 0xe7, 0xb3, 0xfb, 0xd1, 0x0d, 0x5a, 0xca, 0xd9, 0x5e, 0xf3, 0xf5, 0x45, 0xcb, 0xfa, 0xe3, 0xa2, + 0x65, 0xfd, 0x7d, 0xd1, 0xb2, 0x06, 0xb6, 0xfc, 0x23, 0x3f, 0xfe, 0x2f, 0x00, 0x00, 0xff, 0xff, + 0x67, 0x51, 0xef, 0xa3, 0xc5, 0x0c, 0x00, 0x00, } diff --git a/api/services/control/control.proto b/api/services/control/control.proto index 6eac542a..672da025 100644 --- a/api/services/control/control.proto +++ b/api/services/control/control.proto @@ -12,12 +12,17 @@ option (gogoproto.unmarshaler_all) = true; service Control { rpc DiskUsage(DiskUsageRequest) returns (DiskUsageResponse); + rpc Prune(PruneRequest) returns (stream UsageRecord); rpc Solve(SolveRequest) returns (SolveResponse); rpc Status(StatusRequest) returns (stream StatusResponse); rpc Session(stream BytesMessage) returns (stream BytesMessage); rpc ListWorkers(ListWorkersRequest) returns (ListWorkersResponse); } +message PruneRequest { + // TODO: filter +} + message DiskUsageRequest { string filter = 1; // FIXME: this should be containerd-compatible repeated string? } diff --git a/cache/gc.go b/cache/gc.go index 065e8cb3..31a98b93 100644 --- a/cache/gc.go +++ b/cache/gc.go @@ -22,10 +22,6 @@ type GCPolicy struct { // return CachePolicy{Priority: 10, LastUsed: time.Now()} // } -func (cm *cacheManager) Prune(ctx context.Context) (map[string]int64, error) { - return nil, errors.New("Prune not implemented") -} - func (cm *cacheManager) GC(ctx context.Context) error { return errors.New("GC not implemented") } diff --git a/cache/manager.go b/cache/manager.go index 4f772147..b1556a1a 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -36,7 +36,7 @@ type Accessor interface { type Controller interface { DiskUsage(ctx context.Context, info client.DiskUsageInfo) ([]*client.UsageInfo, error) - Prune(ctx context.Context) (map[string]int64, error) + Prune(ctx context.Context, ch chan client.UsageInfo) error GC(ctx context.Context) error } @@ -124,7 +124,7 @@ func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) ( } // getRecord returns record for id. Requires manager lock. -func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOption) (*cacheRecord, error) { +func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOption) (cr *cacheRecord, retErr error) { if rec, ok := cm.records[id]; ok { if rec.isDead() { return nil, errNotFound @@ -136,13 +136,17 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt if mutableID := getEqualMutable(md); mutableID != "" { mutable, err := cm.getRecord(ctx, mutableID) if err != nil { + // check loading mutable deleted record from disk + if errors.Cause(err) == errNotFound { + cm.md.Clear(id) + } return nil, err } rec := &cacheRecord{ mu: &sync.Mutex{}, cm: cm, refs: make(map[Mountable]struct{}), - parent: mutable.parent, + parent: mutable.Parent(), md: md, equalMutable: &mutableRef{cacheRecord: mutable}, } @@ -162,6 +166,11 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt if err != nil { return nil, err } + defer func() { + if retErr != nil { + parent.Release(context.TODO()) + } + }() } rec := &cacheRecord{ @@ -173,6 +182,14 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt md: md, } + // the record was deleted but we crashed before data on disk was removed + if getDeleted(md) { + if err := rec.remove(ctx, true); err != nil { + return nil, err + } + return nil, errNotFound + } + if err := initializeMetadata(rec, opts...); err != nil { if parent != nil { parent.Release(context.TODO()) @@ -269,6 +286,97 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, return rec.mref(), nil } +func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo) error { + // TODO: global prune lock + + var toDelete []*cacheRecord + cm.mu.Lock() + + for _, cr := range cm.records { + cr.mu.Lock() + // ignore duplicates that share data + if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 { + cr.mu.Unlock() + continue + } + + if len(cr.refs) == 0 { + cr.dead = true + toDelete = append(toDelete, cr) + } + + // mark metadata as deleted in case we crash before cleanup finished + if err := setDeleted(cr.md); err != nil { + cr.mu.Unlock() + cm.mu.Unlock() + return err + } + cr.mu.Unlock() + } + + cm.mu.Unlock() + + if len(toDelete) == 0 { + return nil + } + + var err error + for _, cr := range toDelete { + cr.mu.Lock() + + usageCount, lastUsedAt := getLastUsed(cr.md) + + c := client.UsageInfo{ + ID: cr.ID(), + Mutable: cr.mutable, + InUse: len(cr.refs) > 0, + Size: getSize(cr.md), + CreatedAt: getCreatedAt(cr.md), + Description: GetDescription(cr.md), + LastUsedAt: lastUsedAt, + UsageCount: usageCount, + } + + if cr.parent != nil { + c.Parent = cr.parent.ID() + } + + if c.Size == sizeUnknown { + cr.mu.Unlock() // all the non-prune modifications already protected by cr.dead + s, err := cr.Size(ctx) + if err != nil { + return err + } + c.Size = s + cr.mu.Lock() + } + + if cr.equalImmutable != nil { + if err1 := cr.equalImmutable.remove(ctx, false); err == nil { + err = err1 + } + } + if err1 := cr.remove(ctx, true); err == nil { + err = err1 + } + + if err == nil && ch != nil { + ch <- c + } + cr.mu.Unlock() + } + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + return cm.Prune(ctx, ch) + } +} + func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) { cm.mu.Lock() @@ -281,6 +389,7 @@ func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) usageCount int lastUsedAt *time.Time description string + doubleRef bool } m := make(map[string]*cacheUsageInfo, len(cm.records)) @@ -303,6 +412,7 @@ func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) usageCount: usageCount, lastUsedAt: lastUsedAt, description: GetDescription(cr.md), + doubleRef: cr.equalImmutable != nil, } if cr.parent != nil { c.parent = cr.parent.ID() @@ -324,6 +434,9 @@ func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) v := m[id] if v.refs == 0 && v.parent != "" { m[v.parent].refs-- + if v.doubleRef { + m[v.parent].refs-- + } rescan[v.parent] = struct{}{} } delete(rescan, id) diff --git a/cache/manager_test.go b/cache/manager_test.go index f4563744..be34a016 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -124,8 +124,119 @@ func TestManager(t *testing.T) { checkDiskUsage(ctx, t, cm, 0, 2) + buf := pruneResultBuffer() + err = cm.Prune(ctx, buf.C) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 0, 0) + + require.Equal(t, len(buf.all), 2) + err = cm.Close() require.NoError(t, err) + + dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 0, len(dirs)) +} + +func TestPrune(t *testing.T) { + t.Parallel() + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") + + tmpdir, err := ioutil.TempDir("", "cachemanager") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + snapshotter, err := naive.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) + require.NoError(t, err) + cm := getCacheManager(t, tmpdir, snapshotter) + + active, err := cm.New(ctx, nil) + require.NoError(t, err) + + snap, err := active.Commit(ctx) + require.NoError(t, err) + + active, err = cm.New(ctx, snap, CachePolicyRetain) + require.NoError(t, err) + + snap2, err := active.Commit(ctx) + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 2, 0) + + // prune with keeping refs does nothing + buf := pruneResultBuffer() + err = cm.Prune(ctx, buf.C) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 2, 0) + require.Equal(t, len(buf.all), 0) + + dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 2, len(dirs)) + + err = snap2.Release(ctx) + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 1, 1) + + // prune with keeping single refs deletes one + buf = pruneResultBuffer() + err = cm.Prune(ctx, buf.C) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 1, 0) + require.Equal(t, len(buf.all), 1) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 1, len(dirs)) + + err = snap.Release(ctx) + require.NoError(t, err) + + active, err = cm.New(ctx, snap, CachePolicyRetain) + require.NoError(t, err) + + snap2, err = active.Commit(ctx) + require.NoError(t, err) + + err = snap.Release(ctx) + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 2, 0) + + // prune with parent released does nothing + buf = pruneResultBuffer() + err = cm.Prune(ctx, buf.C) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 2, 0) + require.Equal(t, len(buf.all), 0) + + // releasing last reference + err = snap2.Release(ctx) + require.NoError(t, err) + checkDiskUsage(ctx, t, cm, 0, 2) + + buf = pruneResultBuffer() + err = cm.Prune(ctx, buf.C) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 0, 0) + require.Equal(t, len(buf.all), 2) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 0, len(dirs)) } func TestLazyCommit(t *testing.T) { @@ -283,3 +394,25 @@ func checkDiskUsage(ctx context.Context, t *testing.T, cm Manager, inuse, unused require.Equal(t, inuse, inuseActual) require.Equal(t, unused, unusedActual) } + +func pruneResultBuffer() *buf { + b := &buf{C: make(chan client.UsageInfo), closed: make(chan struct{})} + go func() { + for c := range b.C { + b.all = append(b.all, c) + } + close(b.closed) + }() + return b +} + +type buf struct { + C chan client.UsageInfo + closed chan struct{} + all []client.UsageInfo +} + +func (b *buf) close() { + close(b.C) + <-b.closed +} diff --git a/cache/metadata.go b/cache/metadata.go index 3884ee55..2bbdb4f2 100644 --- a/cache/metadata.go +++ b/cache/metadata.go @@ -8,13 +8,6 @@ import ( "github.com/pkg/errors" ) -// Fields to be added: -// Size int64 -// AccessTime int64 -// Tags -// Descr -// CachePolicy - const sizeUnknown int64 = -1 const keySize = "snapshot.size" const keyEqualMutable = "cache.equalMutable" @@ -24,6 +17,31 @@ const keyCreatedAt = "cache.createdAt" const keyLastUsedAt = "cache.lastUsedAt" const keyUsageCount = "cache.usageCount" +const keyDeleted = "cache.deleted" + +func setDeleted(si *metadata.StorageItem) error { + v, err := metadata.NewValue(true) + if err != nil { + return errors.Wrap(err, "failed to create size value") + } + si.Update(func(b *bolt.Bucket) error { + return si.SetValue(b, keyDeleted, v) + }) + return nil +} + +func getDeleted(si *metadata.StorageItem) bool { + v := si.Get(keyDeleted) + if v == nil { + return false + } + var deleted bool + if err := v.Unmarshal(&deleted); err != nil { + return false + } + return deleted +} + func setSize(si *metadata.StorageItem, s int64) error { v, err := metadata.NewValue(s) if err != nil { diff --git a/cache/refs.go b/cache/refs.go index 7e983768..ed50f48d 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -83,10 +83,6 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { // this expects that usage() is implemented lazily s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) { cr.mu.Lock() - if cr.isDead() { - cr.mu.Unlock() - return 0, nil - } s := getSize(cr.md) if s != sizeUnknown { cr.mu.Unlock() @@ -99,6 +95,10 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { cr.mu.Unlock() usage, err := cr.cm.ManagerOpt.Snapshotter.Usage(ctx, driverID) if err != nil { + if cr.isDead() { + cr.mu.Unlock() + return int64(0), nil + } return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID()) } cr.mu.Lock() @@ -163,16 +163,22 @@ func (cr *cacheRecord) Mount(ctx context.Context, readonly bool) ([]mount.Mount, return cr.viewMount, nil } +// call when holding the manager lock func (cr *cacheRecord) remove(ctx context.Context, removeSnapshot bool) error { delete(cr.cm.records, cr.ID()) - if err := cr.cm.md.Clear(cr.ID()); err != nil { - return err + if cr.parent != nil { + if err := cr.parent.(*immutableRef).release(ctx); err != nil { + return err + } } if removeSnapshot { if err := cr.cm.Snapshotter.Remove(ctx, cr.ID()); err != nil { return err } } + if err := cr.cm.md.Clear(cr.ID()); err != nil { + return err + } return nil } @@ -268,7 +274,7 @@ func (sr *mutableRef) commit(ctx context.Context) (ImmutableRef, error) { rec := &cacheRecord{ mu: sr.mu, cm: sr.cm, - parent: sr.parent, + parent: sr.Parent(), equalMutable: sr, refs: make(map[Mountable]struct{}), md: md, diff --git a/client/diskusage.go b/client/diskusage.go index fe97f773..5ed50432 100644 --- a/client/diskusage.go +++ b/client/diskusage.go @@ -31,7 +31,7 @@ func (c *Client) DiskUsage(ctx context.Context, opts ...DiskUsageOption) ([]*Usa req := &controlapi.DiskUsageRequest{Filter: info.Filter} resp, err := c.controlClient().DiskUsage(ctx, req) if err != nil { - return nil, errors.Wrap(err, "failed to get diskusage") + return nil, errors.Wrap(err, "failed to call diskusage") } var du []*UsageInfo diff --git a/client/prune.go b/client/prune.go new file mode 100644 index 00000000..b3c1edcd --- /dev/null +++ b/client/prune.go @@ -0,0 +1,50 @@ +package client + +import ( + "context" + "io" + + controlapi "github.com/moby/buildkit/api/services/control" + "github.com/pkg/errors" +) + +func (c *Client) Prune(ctx context.Context, ch chan UsageInfo, opts ...PruneOption) error { + info := &PruneInfo{} + for _, o := range opts { + o(info) + } + + req := &controlapi.PruneRequest{} + cl, err := c.controlClient().Prune(ctx, req) + if err != nil { + return errors.Wrap(err, "failed to call prune") + } + + for { + d, err := cl.Recv() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + if ch != nil { + ch <- UsageInfo{ + ID: d.ID, + Mutable: d.Mutable, + InUse: d.InUse, + Size: d.Size_, + Parent: d.Parent, + CreatedAt: d.CreatedAt, + Description: d.Description, + UsageCount: int(d.UsageCount), + LastUsedAt: d.LastUsedAt, + } + } + } +} + +type PruneOption func(*PruneInfo) + +type PruneInfo struct { +} diff --git a/cmd/buildctl/diskusage.go b/cmd/buildctl/diskusage.go index fec861bc..c08cf0cd 100644 --- a/cmd/buildctl/diskusage.go +++ b/cmd/buildctl/diskusage.go @@ -83,19 +83,27 @@ func printVerbose(tw *tabwriter.Writer, du []*client.UsageInfo) { } func printTable(tw *tabwriter.Writer, du []*client.UsageInfo) { - fmt.Fprintln(tw, "ID\tRECLAIMABLE\tSIZE\tLAST ACCESSED") + printTableHeader(tw) for _, di := range du { - id := di.ID - if di.Mutable { - id += "*" - } - fmt.Fprintf(tw, "%s\t%v\t%.2f\t\n", id, !di.InUse, units.Bytes(di.Size)) + printTableRow(tw, di) } tw.Flush() } +func printTableHeader(tw *tabwriter.Writer) { + fmt.Fprintln(tw, "ID\tRECLAIMABLE\tSIZE\tLAST ACCESSED") +} + +func printTableRow(tw *tabwriter.Writer, di *client.UsageInfo) { + id := di.ID + if di.Mutable { + id += "*" + } + fmt.Fprintf(tw, "%-71s\t%-11v\t%.2f\t\n", id, !di.InUse, units.Bytes(di.Size)) +} + func printSummary(tw *tabwriter.Writer, du []*client.UsageInfo) { total := int64(0) reclaimable := int64(0) diff --git a/cmd/buildctl/main.go b/cmd/buildctl/main.go index 98e72fd3..31fa954f 100644 --- a/cmd/buildctl/main.go +++ b/cmd/buildctl/main.go @@ -56,6 +56,7 @@ func main() { app.Commands = []cli.Command{ diskUsageCommand, + pruneCommand, buildCommand, debugCommand, } diff --git a/cmd/buildctl/prune.go b/cmd/buildctl/prune.go new file mode 100644 index 00000000..30ac8e73 --- /dev/null +++ b/cmd/buildctl/prune.go @@ -0,0 +1,68 @@ +package main + +import ( + "fmt" + "os" + "text/tabwriter" + + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/util/appcontext" + "github.com/tonistiigi/units" + "github.com/urfave/cli" +) + +var pruneCommand = cli.Command{ + Name: "prune", + Usage: "clean up build cache", + Action: prune, + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "verbose, v", + Usage: "Verbose output", + }, + }, +} + +func prune(clicontext *cli.Context) error { + c, err := resolveClient(clicontext) + if err != nil { + return err + } + + ch := make(chan client.UsageInfo) + printed := make(chan struct{}) + + tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0) + first := true + total := int64(0) + + go func() { + defer close(printed) + for du := range ch { + total += du.Size + if clicontext.Bool("verbose") { + printVerbose(tw, []*client.UsageInfo{&du}) + } else { + if first { + printTableHeader(tw) + first = false + } + printTableRow(tw, &du) + tw.Flush() + } + } + }() + + err = c.Prune(appcontext.Context(), ch) + close(ch) + <-printed + if err != nil { + return err + } + + tw = tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0) + fmt.Fprintf(tw, "Total:\t%.2f\n", units.Bytes(total)) + tw.Flush() + + return nil +} diff --git a/control/control.go b/control/control.go index 8b2c13e1..22fec87a 100644 --- a/control/control.go +++ b/control/control.go @@ -81,6 +81,53 @@ func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageReque return resp, nil } +func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Control_PruneServer) error { + ch := make(chan client.UsageInfo) + + eg, ctx := errgroup.WithContext(stream.Context()) + workers, err := c.opt.WorkerController.List() + if err != nil { + return errors.Wrap(err, "failed to list workers for prune") + } + + for _, w := range workers { + func(w worker.Worker) { + eg.Go(func() error { + return w.Prune(ctx, ch) + }) + }(w) + } + + eg2, ctx := errgroup.WithContext(stream.Context()) + + eg2.Go(func() error { + defer close(ch) + return eg.Wait() + }) + + eg2.Go(func() error { + for r := range ch { + if err := stream.Send(&controlapi.UsageRecord{ + // TODO: add worker info + ID: r.ID, + Mutable: r.Mutable, + InUse: r.InUse, + Size_: r.Size, + Parent: r.Parent, + UsageCount: int64(r.UsageCount), + Description: r.Description, + CreatedAt: r.CreatedAt, + LastUsedAt: r.LastUsedAt, + }); err != nil { + return err + } + } + return nil + }) + + return eg2.Wait() +} + func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*controlapi.SolveResponse, error) { var frontend frontend.Frontend if req.Frontend != "" { diff --git a/worker/base/worker.go b/worker/base/worker.go index 3c27ed5d..d938fd0a 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -267,6 +267,10 @@ func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*cl return w.CacheManager.DiskUsage(ctx, opt) } +func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo) error { + return w.CacheManager.Prune(ctx, ch) +} + func (w *Worker) Exporter(name string) (exporter.Exporter, error) { exp, ok := w.Exporters[name] if !ok { diff --git a/worker/runc/runc.go b/worker/runc/runc.go index a54be443..a915ca1c 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -4,6 +4,7 @@ import ( "context" "os" "path/filepath" + "time" "github.com/boltdb/bolt" "github.com/containerd/containerd/content" @@ -18,6 +19,7 @@ import ( "github.com/moby/buildkit/executor/runcexecutor" "github.com/moby/buildkit/worker/base" "github.com/opencontainers/go-digest" + "github.com/sirupsen/logrus" ) // NewWorkerOpt creates a WorkerOpt. @@ -60,7 +62,7 @@ func NewWorkerOpt(root string, labels map[string]string) (base.WorkerOpt, error) return opt, err } - c = &nsContent{mdb.ContentStore()} + c = &noGCContentStore{&nsContent{mdb.ContentStore(), mdb.GarbageCollect}} df, err := walking.NewWalkingDiff(c) if err != nil { return opt, err @@ -81,7 +83,7 @@ func NewWorkerOpt(root string, labels map[string]string) (base.WorkerOpt, error) Labels: xlabels, MetadataStore: md, Executor: exe, - BaseSnapshotter: &nsSnapshotter{mdb.Snapshotter("overlayfs")}, + BaseSnapshotter: &nsSnapshotter{mdb.Snapshotter("overlayfs"), mdb.GarbageCollect}, ContentStore: c, Applier: df, Differ: df, @@ -93,8 +95,11 @@ func NewWorkerOpt(root string, labels map[string]string) (base.WorkerOpt, error) // this should be supported by containerd. currently packages are unusable without wrapping const dummyNs = "buildkit" +type garbageCollect func(context.Context) (ctdmetadata.GCStats, error) + type nsContent struct { content.Store + gc garbageCollect } func (c *nsContent) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { @@ -114,7 +119,12 @@ func (c *nsContent) Walk(ctx context.Context, fn content.WalkFunc, filters ...st func (c *nsContent) Delete(ctx context.Context, dgst digest.Digest) error { ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.Delete(ctx, dgst) + logrus.Debugf("delete-blob", dgst) + if err := c.Store.Delete(ctx, dgst); err != nil { + return err + } + _, err := c.gc(ctx) + return err } func (c *nsContent) Status(ctx context.Context, ref string) (content.Status, error) { @@ -144,6 +154,7 @@ func (c *nsContent) Writer(ctx context.Context, ref string, size int64, expected type nsSnapshotter struct { ctdsnapshot.Snapshotter + gc garbageCollect } func (s *nsSnapshotter) Stat(ctx context.Context, key string) (ctdsnapshot.Info, error) { @@ -178,9 +189,38 @@ func (s *nsSnapshotter) Commit(ctx context.Context, name, key string, opts ...ct } func (s *nsSnapshotter) Remove(ctx context.Context, key string) error { ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.Remove(ctx, key) + if _, err := s.Update(ctx, ctdsnapshot.Info{ + Name: key, + }, "labels.containerd.io/gc.root"); err != nil { + return err + } // calling snapshotter.Remove here causes a race in containerd + _, err := s.gc(ctx) + return err } func (s *nsSnapshotter) Walk(ctx context.Context, fn func(context.Context, ctdsnapshot.Info) error) error { ctx = namespaces.WithNamespace(ctx, dummyNs) return s.Snapshotter.Walk(ctx, fn) } + +type noGCContentStore struct { + content.Store +} +type noGCWriter struct { + content.Writer +} + +func (cs *noGCContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { + w, err := cs.Store.Writer(ctx, ref, size, expected) + return &noGCWriter{w}, err +} + +func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + opts = append(opts, func(info *content.Info) error { + if info.Labels == nil { + info.Labels = map[string]string{} + } + info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) + return nil + }) + return w.Writer.Commit(ctx, size, expected, opts...) +} diff --git a/worker/worker.go b/worker/worker.go index 71f85b8b..83c419d6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -29,6 +29,7 @@ type Worker interface { Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) Exporter(name string) (exporter.Exporter, error) + Prune(ctx context.Context, ch chan client.UsageInfo) error } // Pre-defined label keys From 56353f99a837f48abe0db9162adef83c3edf7b0e Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 27 Dec 2017 22:29:50 -0800 Subject: [PATCH 3/8] snapshot: combine nogc snapshotting helpers Signed-off-by: Tonis Tiigi --- snapshot/nogc/content.go | 98 +++++++++++++++++++++ snapshot/nogc/snapshotter.go | 66 ++++++++++++++ worker/containerd/containerd.go | 49 ++++------- worker/runc/runc.go | 151 ++------------------------------ 4 files changed, 191 insertions(+), 173 deletions(-) create mode 100644 snapshot/nogc/content.go create mode 100644 snapshot/nogc/snapshotter.go diff --git a/snapshot/nogc/content.go b/snapshot/nogc/content.go new file mode 100644 index 00000000..f78a8a69 --- /dev/null +++ b/snapshot/nogc/content.go @@ -0,0 +1,98 @@ +package nogc + +import ( + "context" + "time" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/namespaces" + "github.com/opencontainers/go-digest" +) + +type garbageCollectFn func(context.Context) error + +func NewContentStore(store content.Store, ns string, gc func(context.Context) error) content.Store { + return &noGCContentStore{&nsContent{ns, store, gc}} +} + +type nsContent struct { + ns string + content.Store + gc garbageCollectFn +} + +func (c *nsContent) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { + ctx = namespaces.WithNamespace(ctx, c.ns) + return c.Store.Info(ctx, dgst) +} + +func (c *nsContent) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { + ctx = namespaces.WithNamespace(ctx, c.ns) + return c.Store.Update(ctx, info, fieldpaths...) +} + +func (c *nsContent) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { + ctx = namespaces.WithNamespace(ctx, c.ns) + return c.Store.Walk(ctx, fn, filters...) +} + +func (c *nsContent) Delete(ctx context.Context, dgst digest.Digest) error { + ctx = namespaces.WithNamespace(ctx, c.ns) + if _, err := c.Update(ctx, content.Info{ + Digest: dgst, + }, "labels.containerd.io/gc.root"); err != nil { + return err + } // calling snapshotter.Remove here causes a race in containerd + if c.gc == nil { + return nil + } + return c.gc(ctx) +} + +func (c *nsContent) Status(ctx context.Context, ref string) (content.Status, error) { + ctx = namespaces.WithNamespace(ctx, c.ns) + return c.Store.Status(ctx, ref) +} + +func (c *nsContent) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { + ctx = namespaces.WithNamespace(ctx, c.ns) + return c.Store.ListStatuses(ctx, filters...) +} + +func (c *nsContent) Abort(ctx context.Context, ref string) error { + ctx = namespaces.WithNamespace(ctx, c.ns) + return c.Store.Abort(ctx, ref) +} + +func (c *nsContent) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { + ctx = namespaces.WithNamespace(ctx, c.ns) + return c.Store.ReaderAt(ctx, dgst) +} + +func (c *nsContent) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { + ctx = namespaces.WithNamespace(ctx, c.ns) + return c.Store.Writer(ctx, ref, size, expected) +} + +type noGCContentStore struct { + content.Store +} +type noGCWriter struct { + content.Writer +} + +func (cs *noGCContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { + w, err := cs.Store.Writer(ctx, ref, size, expected) + return &noGCWriter{w}, err +} + +func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + opts = append(opts, func(info *content.Info) error { + if info.Labels == nil { + info.Labels = map[string]string{} + } + info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) + return nil + }) + return w.Writer.Commit(ctx, size, expected, opts...) +} diff --git a/snapshot/nogc/snapshotter.go b/snapshot/nogc/snapshotter.go new file mode 100644 index 00000000..e1b5205d --- /dev/null +++ b/snapshot/nogc/snapshotter.go @@ -0,0 +1,66 @@ +package nogc + +import ( + "context" + + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + ctdsnapshot "github.com/containerd/containerd/snapshots" +) + +func NewSnapshotter(snapshotter ctdsnapshot.Snapshotter, ns string, gc func(context.Context) error) ctdsnapshot.Snapshotter { + return &nsSnapshotter{ns, snapshotter, gc} +} + +type nsSnapshotter struct { + ns string + ctdsnapshot.Snapshotter + gc garbageCollectFn +} + +func (s *nsSnapshotter) Stat(ctx context.Context, key string) (ctdsnapshot.Info, error) { + ctx = namespaces.WithNamespace(ctx, s.ns) + return s.Snapshotter.Stat(ctx, key) +} + +func (s *nsSnapshotter) Update(ctx context.Context, info ctdsnapshot.Info, fieldpaths ...string) (ctdsnapshot.Info, error) { + ctx = namespaces.WithNamespace(ctx, s.ns) + return s.Snapshotter.Update(ctx, info, fieldpaths...) +} + +func (s *nsSnapshotter) Usage(ctx context.Context, key string) (ctdsnapshot.Usage, error) { + ctx = namespaces.WithNamespace(ctx, s.ns) + return s.Snapshotter.Usage(ctx, key) +} +func (s *nsSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) { + ctx = namespaces.WithNamespace(ctx, s.ns) + return s.Snapshotter.Mounts(ctx, key) +} +func (s *nsSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) { + ctx = namespaces.WithNamespace(ctx, s.ns) + return s.Snapshotter.Prepare(ctx, key, parent, opts...) +} +func (s *nsSnapshotter) View(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) { + ctx = namespaces.WithNamespace(ctx, s.ns) + return s.Snapshotter.View(ctx, key, parent, opts...) +} +func (s *nsSnapshotter) Commit(ctx context.Context, name, key string, opts ...ctdsnapshot.Opt) error { + ctx = namespaces.WithNamespace(ctx, s.ns) + return s.Snapshotter.Commit(ctx, name, key, opts...) +} +func (s *nsSnapshotter) Remove(ctx context.Context, key string) error { + ctx = namespaces.WithNamespace(ctx, s.ns) + if _, err := s.Update(ctx, ctdsnapshot.Info{ + Name: key, + }, "labels.containerd.io/gc.root"); err != nil { + return err + } // calling snapshotter.Remove here causes a race in containerd + if s.gc == nil { + return nil + } + return s.gc(ctx) +} +func (s *nsSnapshotter) Walk(ctx context.Context, fn func(context.Context, ctdsnapshot.Info) error) error { + ctx = namespaces.WithNamespace(ctx, s.ns) + return s.Snapshotter.Walk(ctx, fn) +} diff --git a/worker/containerd/containerd.go b/worker/containerd/containerd.go index d18b4385..86723e47 100644 --- a/worker/containerd/containerd.go +++ b/worker/containerd/containerd.go @@ -5,14 +5,14 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/containerd/containerd" - "github.com/containerd/containerd/content" + "github.com/containerd/containerd/namespaces" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/containerdexecutor" + "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/snapshot/nogc" "github.com/moby/buildkit/worker/base" - digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -53,41 +53,30 @@ func newContainerd(root string, client *containerd.Client, snapshotterName strin for k, v := range labels { xlabels[k] = v } + + gc := func(ctx context.Context) error { + snapshotter := client.SnapshotService(snapshotterName) + ctx = namespaces.WithNamespace(ctx, "buildkit") + key := identity.NewID() + if _, err := snapshotter.Prepare(ctx, key, ""); err != nil { + return err + } + if err := snapshotter.Remove(ctx, key); err != nil { + return err + } + return nil + } + opt := base.WorkerOpt{ ID: id, Labels: xlabels, MetadataStore: md, Executor: containerdexecutor.New(client, root), - BaseSnapshotter: client.SnapshotService(snapshotterName), - ContentStore: &noGCContentStore{client.ContentStore()}, + BaseSnapshotter: nogc.NewSnapshotter(client.SnapshotService(snapshotterName), "buildkit", gc), + ContentStore: nogc.NewContentStore(client.ContentStore(), "buildkit", gc), Applier: df, Differ: df, ImageStore: client.ImageService(), } return opt, nil } - -// TODO: Replace this with leases - -type noGCContentStore struct { - content.Store -} -type noGCWriter struct { - content.Writer -} - -func (cs *noGCContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { - w, err := cs.Store.Writer(ctx, ref, size, expected) - return &noGCWriter{w}, err -} - -func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { - opts = append(opts, func(info *content.Info) error { - if info.Labels == nil { - info.Labels = map[string]string{} - } - info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) - return nil - }) - return w.Writer.Commit(ctx, size, expected, opts...) -} diff --git a/worker/runc/runc.go b/worker/runc/runc.go index a915ca1c..983a7e78 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -4,22 +4,17 @@ import ( "context" "os" "path/filepath" - "time" "github.com/boltdb/bolt" - "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/diff/walking" ctdmetadata "github.com/containerd/containerd/metadata" - "github.com/containerd/containerd/mount" - "github.com/containerd/containerd/namespaces" ctdsnapshot "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/overlay" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/runcexecutor" + "github.com/moby/buildkit/snapshot/nogc" "github.com/moby/buildkit/worker/base" - "github.com/opencontainers/go-digest" - "github.com/sirupsen/logrus" ) // NewWorkerOpt creates a WorkerOpt. @@ -62,14 +57,17 @@ func NewWorkerOpt(root string, labels map[string]string) (base.WorkerOpt, error) return opt, err } - c = &noGCContentStore{&nsContent{mdb.ContentStore(), mdb.GarbageCollect}} + gc := func(ctx context.Context) error { + _, err := mdb.GarbageCollect(ctx) + return err + } + + c = nogc.NewContentStore(mdb.ContentStore(), "buildkit", gc) df, err := walking.NewWalkingDiff(c) if err != nil { return opt, err } - // TODO: call mdb.GarbageCollect . maybe just inject it into nsSnapshotter.Remove and csContent.Delete - id, err := base.ID(root) if err != nil { return opt, err @@ -83,7 +81,7 @@ func NewWorkerOpt(root string, labels map[string]string) (base.WorkerOpt, error) Labels: xlabels, MetadataStore: md, Executor: exe, - BaseSnapshotter: &nsSnapshotter{mdb.Snapshotter("overlayfs"), mdb.GarbageCollect}, + BaseSnapshotter: nogc.NewSnapshotter(mdb.Snapshotter("overlayfs"), "buildkit", gc), ContentStore: c, Applier: df, Differ: df, @@ -91,136 +89,3 @@ func NewWorkerOpt(root string, labels map[string]string) (base.WorkerOpt, error) } return opt, nil } - -// this should be supported by containerd. currently packages are unusable without wrapping -const dummyNs = "buildkit" - -type garbageCollect func(context.Context) (ctdmetadata.GCStats, error) - -type nsContent struct { - content.Store - gc garbageCollect -} - -func (c *nsContent) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.Info(ctx, dgst) -} - -func (c *nsContent) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.Update(ctx, info, fieldpaths...) -} - -func (c *nsContent) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.Walk(ctx, fn, filters...) -} - -func (c *nsContent) Delete(ctx context.Context, dgst digest.Digest) error { - ctx = namespaces.WithNamespace(ctx, dummyNs) - logrus.Debugf("delete-blob", dgst) - if err := c.Store.Delete(ctx, dgst); err != nil { - return err - } - _, err := c.gc(ctx) - return err -} - -func (c *nsContent) Status(ctx context.Context, ref string) (content.Status, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.Status(ctx, ref) -} - -func (c *nsContent) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.ListStatuses(ctx, filters...) -} - -func (c *nsContent) Abort(ctx context.Context, ref string) error { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.Abort(ctx, ref) -} - -func (c *nsContent) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.ReaderAt(ctx, dgst) -} - -func (c *nsContent) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return c.Store.Writer(ctx, ref, size, expected) -} - -type nsSnapshotter struct { - ctdsnapshot.Snapshotter - gc garbageCollect -} - -func (s *nsSnapshotter) Stat(ctx context.Context, key string) (ctdsnapshot.Info, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.Stat(ctx, key) -} - -func (s *nsSnapshotter) Update(ctx context.Context, info ctdsnapshot.Info, fieldpaths ...string) (ctdsnapshot.Info, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.Update(ctx, info, fieldpaths...) -} - -func (s *nsSnapshotter) Usage(ctx context.Context, key string) (ctdsnapshot.Usage, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.Usage(ctx, key) -} -func (s *nsSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.Mounts(ctx, key) -} -func (s *nsSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.Prepare(ctx, key, parent, opts...) -} -func (s *nsSnapshotter) View(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.View(ctx, key, parent, opts...) -} -func (s *nsSnapshotter) Commit(ctx context.Context, name, key string, opts ...ctdsnapshot.Opt) error { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.Commit(ctx, name, key, opts...) -} -func (s *nsSnapshotter) Remove(ctx context.Context, key string) error { - ctx = namespaces.WithNamespace(ctx, dummyNs) - if _, err := s.Update(ctx, ctdsnapshot.Info{ - Name: key, - }, "labels.containerd.io/gc.root"); err != nil { - return err - } // calling snapshotter.Remove here causes a race in containerd - _, err := s.gc(ctx) - return err -} -func (s *nsSnapshotter) Walk(ctx context.Context, fn func(context.Context, ctdsnapshot.Info) error) error { - ctx = namespaces.WithNamespace(ctx, dummyNs) - return s.Snapshotter.Walk(ctx, fn) -} - -type noGCContentStore struct { - content.Store -} -type noGCWriter struct { - content.Writer -} - -func (cs *noGCContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { - w, err := cs.Store.Writer(ctx, ref, size, expected) - return &noGCWriter{w}, err -} - -func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { - opts = append(opts, func(info *content.Info) error { - if info.Labels == nil { - info.Labels = map[string]string{} - } - info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) - return nil - }) - return w.Writer.Commit(ctx, size, expected, opts...) -} From b0679c66db9a94e6b96bf16e2972a0f16463f9f3 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 27 Dec 2017 23:07:13 -0800 Subject: [PATCH 4/8] snapshot: clean up snapshot interface Signed-off-by: Tonis Tiigi --- cache/blobs/blobs.go | 14 +------ cache/cacheimport/export.go | 6 --- cache/cacheimport/import.go | 3 +- cache/contenthash/checksum_test.go | 3 +- cache/manager.go | 18 ++++---- cache/manager_test.go | 3 +- cache/refs.go | 12 +----- snapshot/blobmapping/snapshotter.go | 7 ++-- snapshot/{nogc => containerd}/content.go | 2 +- snapshot/{nogc => containerd}/snapshotter.go | 36 +++++++++++++--- snapshot/snapshotter.go | 9 ++++ source/containerimage/pull.go | 14 ++----- worker/base/worker.go | 44 +++++++------------- worker/containerd/containerd.go | 29 ++++++++----- worker/runc/runc.go | 22 +++++----- 15 files changed, 111 insertions(+), 111 deletions(-) rename snapshot/{nogc => containerd}/content.go (99%) rename snapshot/{nogc => containerd}/snapshotter.go (64%) diff --git a/cache/blobs/blobs.go b/cache/blobs/blobs.go index 8dd3c07c..04cb5130 100644 --- a/cache/blobs/blobs.go +++ b/cache/blobs/blobs.go @@ -1,7 +1,6 @@ package blobs import ( - gocontext "context" "time" "github.com/containerd/containerd/content" @@ -26,19 +25,10 @@ type DiffPair struct { Blobsum digest.Digest } -type blobmapper interface { - GetBlob(ctx gocontext.Context, key string) (digest.Digest, digest.Digest, error) - SetBlob(ctx gocontext.Context, key string, diffID, blob digest.Digest) error -} - func GetDiffPairs(ctx context.Context, contentStore content.Store, snapshotter snapshot.Snapshotter, differ diff.Differ, ref cache.ImmutableRef) ([]DiffPair, error) { if ref == nil { return nil, nil } - blobmap, ok := snapshotter.(blobmapper) - if !ok { - return nil, errors.Errorf("image exporter requires snapshotter with blobs mapping support") - } eg, ctx := errgroup.WithContext(ctx) var diffPairs []DiffPair @@ -57,7 +47,7 @@ func GetDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s } eg.Go(func() error { dp, err := g.Do(ctx, ref.ID(), func(ctx context.Context) (interface{}, error) { - diffID, blob, err := blobmap.GetBlob(ctx, ref.ID()) + diffID, blob, err := snapshotter.GetBlob(ctx, ref.ID()) if err != nil { return nil, err } @@ -100,7 +90,7 @@ func GetDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s if err != nil { return nil, err } - if err := blobmap.SetBlob(ctx, ref.ID(), diffIDDigest, descr.Digest); err != nil { + if err := snapshotter.SetBlob(ctx, ref.ID(), diffIDDigest, descr.Digest); err != nil { return nil, err } return DiffPair{DiffID: diffIDDigest, Blobsum: descr.Digest}, nil diff --git a/cache/cacheimport/export.go b/cache/cacheimport/export.go index b1a2e5d7..573525a5 100644 --- a/cache/cacheimport/export.go +++ b/cache/cacheimport/export.go @@ -2,7 +2,6 @@ package cacheimport import ( "bytes" - gocontext "context" "encoding/json" "time" @@ -25,11 +24,6 @@ import ( const mediaTypeConfig = "application/vnd.buildkit.cacheconfig.v0" -type blobmapper interface { - GetBlob(ctx gocontext.Context, key string) (digest.Digest, digest.Digest, error) - SetBlob(ctx gocontext.Context, key string, diffID, blob digest.Digest) error -} - type CacheRecord struct { CacheKey digest.Digest Reference cache.ImmutableRef diff --git a/cache/cacheimport/import.go b/cache/cacheimport/import.go index ba6bf96d..30c5b1da 100644 --- a/cache/cacheimport/import.go +++ b/cache/cacheimport/import.go @@ -269,7 +269,6 @@ func (ii *importInfo) unpack(ctx context.Context, dpairs []blobs.DiffPair) (stri var chain []digest.Digest for _, layer := range layers { labels := map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), "containerd.io/uncompressed": layer.Diff.Digest.String(), } if _, err := rootfs.ApplyLayer(ctx, layer, chain, ii.opt.Snapshotter, ii.opt.Applier, cdsnapshot.WithLabels(labels)); err != nil { @@ -291,7 +290,7 @@ func (ii *importInfo) fillBlobMapping(ctx context.Context, layers []rootfs.Layer for _, l := range layers { chain = append(chain, l.Diff.Digest) chainID := identity.ChainID(chain) - if err := ii.opt.Snapshotter.(blobmapper).SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil { + if err := ii.opt.Snapshotter.SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil { return err } } diff --git a/cache/contenthash/checksum_test.go b/cache/contenthash/checksum_test.go index 1d43e405..467bf7c0 100644 --- a/cache/contenthash/checksum_test.go +++ b/cache/contenthash/checksum_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/naive" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" @@ -383,7 +384,7 @@ func createRef(t *testing.T, cm cache.Manager, files []string) cache.ImmutableRe return ref } -func setupCacheManager(t *testing.T, tmpdir string, snapshotter snapshot.Snapshotter) cache.Manager { +func setupCacheManager(t *testing.T, tmpdir string, snapshotter snapshots.Snapshotter) cache.Manager { md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) require.NoError(t, err) diff --git a/cache/manager.go b/cache/manager.go index b1556a1a..eaf67067 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -6,11 +6,10 @@ import ( "sync" "time" - cdsnapshot "github.com/containerd/containerd/snapshots" + "github.com/containerd/containerd/snapshots" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" - "github.com/moby/buildkit/snapshot" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -23,7 +22,7 @@ var ( ) type ManagerOpt struct { - Snapshotter snapshot.Snapshotter + Snapshotter snapshots.Snapshotter GCPolicy GCPolicy MetadataStore *metadata.Store } @@ -175,7 +174,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt rec := &cacheRecord{ mu: &sync.Mutex{}, - mutable: info.Kind != cdsnapshot.KindCommitted, + mutable: info.Kind != snapshots.KindCommitted, cm: cm, refs: make(map[Mountable]struct{}), parent: parent, @@ -218,10 +217,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti parentID = parent.ID() } - labels := map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - } - if _, err := cm.Snapshotter.Prepare(ctx, id, parentID, cdsnapshot.WithLabels(labels)); err != nil { + if _, err := cm.Snapshotter.Prepare(ctx, id, parentID); err != nil { if parent != nil { parent.Release(context.TODO()) } @@ -294,12 +290,18 @@ func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo) err for _, cr := range cm.records { cr.mu.Lock() + // ignore duplicates that share data if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 { cr.mu.Unlock() continue } + if cr.isDead() { + cr.mu.Unlock() + continue + } + if len(cr.refs) == 0 { cr.dead = true toDelete = append(toDelete, cr) diff --git a/cache/manager_test.go b/cache/manager_test.go index be34a016..6ffa46a2 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/naive" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" @@ -368,7 +369,7 @@ func TestLazyCommit(t *testing.T) { require.Equal(t, errNotFound, errors.Cause(err)) } -func getCacheManager(t *testing.T, tmpdir string, snapshotter snapshot.Snapshotter) Manager { +func getCacheManager(t *testing.T, tmpdir string, snapshotter snapshots.Snapshotter) Manager { md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) require.NoError(t, err) diff --git a/cache/refs.go b/cache/refs.go index ed50f48d..0cf429fd 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -2,10 +2,8 @@ package cache import ( "sync" - "time" "github.com/containerd/containerd/mount" - cdsnapshot "github.com/containerd/containerd/snapshots" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/util/flightcontrol" @@ -150,10 +148,7 @@ func (cr *cacheRecord) Mount(ctx context.Context, readonly bool) ([]mount.Mount, } if cr.viewMount == nil { // TODO: handle this better cr.view = identity.NewID() - labels := map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - } - m, err := cr.cm.Snapshotter.View(ctx, cr.view, cr.ID(), cdsnapshot.WithLabels(labels)) + m, err := cr.cm.Snapshotter.View(ctx, cr.view, cr.ID()) if err != nil { cr.view = "" return nil, errors.Wrapf(err, "failed to mount %s", cr.ID()) @@ -243,10 +238,7 @@ func (cr *cacheRecord) finalize(ctx context.Context) error { if mutable == nil { return nil } - labels := map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - } - err := cr.cm.Snapshotter.Commit(ctx, cr.ID(), mutable.ID(), cdsnapshot.WithLabels(labels)) + err := cr.cm.Snapshotter.Commit(ctx, cr.ID(), mutable.ID()) if err != nil { return errors.Wrapf(err, "failed to commit %s", mutable.ID()) } diff --git a/snapshot/blobmapping/snapshotter.go b/snapshot/blobmapping/snapshotter.go index c759fdbf..646a1819 100644 --- a/snapshot/blobmapping/snapshotter.go +++ b/snapshot/blobmapping/snapshotter.go @@ -7,6 +7,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/snapshots" "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/snapshot" digest "github.com/opencontainers/go-digest" "github.com/sirupsen/logrus" ) @@ -36,13 +37,13 @@ type Snapshotter struct { opt Opt } -func NewSnapshotter(opt Opt) (*Snapshotter, error) { +func NewSnapshotter(opt Opt) snapshot.Snapshotter { s := &Snapshotter{ Snapshotter: opt.Snapshotter, opt: opt, } - return s, nil + return s } // Remove also removes a reference to a blob. If it is a last reference then it deletes it the blob as well @@ -64,7 +65,7 @@ func (s *Snapshotter) Remove(ctx context.Context, key string) error { if len(blobs) == 1 && blobs[0].ID() == key { // last snapshot if err := s.opt.Content.Delete(ctx, blob); err != nil { - logrus.Errorf("failed to delete blob %v", blob) + logrus.Errorf("failed to delete blob %v: %+v", blob, err) } } return nil diff --git a/snapshot/nogc/content.go b/snapshot/containerd/content.go similarity index 99% rename from snapshot/nogc/content.go rename to snapshot/containerd/content.go index f78a8a69..f0617c11 100644 --- a/snapshot/nogc/content.go +++ b/snapshot/containerd/content.go @@ -1,4 +1,4 @@ -package nogc +package containerd import ( "context" diff --git a/snapshot/nogc/snapshotter.go b/snapshot/containerd/snapshotter.go similarity index 64% rename from snapshot/nogc/snapshotter.go rename to snapshot/containerd/snapshotter.go index e1b5205d..e288ece4 100644 --- a/snapshot/nogc/snapshotter.go +++ b/snapshot/containerd/snapshotter.go @@ -1,15 +1,24 @@ -package nogc +package containerd import ( "context" + "time" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" ctdsnapshot "github.com/containerd/containerd/snapshots" + "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/snapshot" + "github.com/moby/buildkit/snapshot/blobmapping" ) -func NewSnapshotter(snapshotter ctdsnapshot.Snapshotter, ns string, gc func(context.Context) error) ctdsnapshot.Snapshotter { - return &nsSnapshotter{ns, snapshotter, gc} +func NewSnapshotter(snapshotter ctdsnapshot.Snapshotter, store content.Store, mdstore *metadata.Store, ns string, gc func(context.Context) error) snapshot.Snapshotter { + return blobmapping.NewSnapshotter(blobmapping.Opt{ + Content: store, + Snapshotter: &nsSnapshotter{ns, snapshotter, gc}, + MetadataStore: mdstore, + }) } type nsSnapshotter struct { @@ -38,15 +47,15 @@ func (s *nsSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, } func (s *nsSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) { ctx = namespaces.WithNamespace(ctx, s.ns) - return s.Snapshotter.Prepare(ctx, key, parent, opts...) + return s.Snapshotter.Prepare(ctx, key, parent, addRootLabel(opts...)) } func (s *nsSnapshotter) View(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) { ctx = namespaces.WithNamespace(ctx, s.ns) - return s.Snapshotter.View(ctx, key, parent, opts...) + return s.Snapshotter.View(ctx, key, parent, addRootLabel(opts...)) } func (s *nsSnapshotter) Commit(ctx context.Context, name, key string, opts ...ctdsnapshot.Opt) error { ctx = namespaces.WithNamespace(ctx, s.ns) - return s.Snapshotter.Commit(ctx, name, key, opts...) + return s.Snapshotter.Commit(ctx, name, key, addRootLabel(opts...)) } func (s *nsSnapshotter) Remove(ctx context.Context, key string) error { ctx = namespaces.WithNamespace(ctx, s.ns) @@ -64,3 +73,18 @@ func (s *nsSnapshotter) Walk(ctx context.Context, fn func(context.Context, ctdsn ctx = namespaces.WithNamespace(ctx, s.ns) return s.Snapshotter.Walk(ctx, fn) } + +func addRootLabel(opts ...ctdsnapshot.Opt) ctdsnapshot.Opt { + return func(info *ctdsnapshot.Info) error { + for _, opt := range opts { + if err := opt(info); err != nil { + return err + } + } + if info.Labels == nil { + info.Labels = map[string]string{} + } + info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) + return nil + } +} diff --git a/snapshot/snapshotter.go b/snapshot/snapshotter.go index 66ade92c..987f721d 100644 --- a/snapshot/snapshotter.go +++ b/snapshot/snapshotter.go @@ -1,10 +1,19 @@ package snapshot import ( + "context" + "github.com/containerd/containerd/snapshots" + digest "github.com/opencontainers/go-digest" ) // Snapshotter defines interface that any snapshot implementation should satisfy type Snapshotter interface { snapshots.Snapshotter + Blobmapper +} + +type Blobmapper interface { + GetBlob(ctx context.Context, key string) (digest.Digest, digest.Digest, error) + SetBlob(ctx context.Context, key string, diffID, blob digest.Digest) error } diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index b796781d..9f18771e 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -20,6 +20,7 @@ import ( "github.com/moby/buildkit/cache" "github.com/moby/buildkit/session" "github.com/moby/buildkit/session/auth" + "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/imageutil" @@ -36,17 +37,12 @@ import ( type SourceOpt struct { SessionManager *session.Manager - Snapshotter snapshots.Snapshotter + Snapshotter snapshot.Snapshotter ContentStore content.Store Applier diff.Differ CacheAccessor cache.Accessor } -type blobmapper interface { - GetBlob(ctx gocontext.Context, key string) (digest.Digest, digest.Digest, error) - SetBlob(ctx gocontext.Context, key string, diffID, blob digest.Digest) error -} - type resolveRecord struct { desc ocispec.Descriptor ts time.Time @@ -62,10 +58,6 @@ func NewSource(opt SourceOpt) (source.Source, error) { SourceOpt: opt, } - if _, ok := opt.Snapshotter.(blobmapper); !ok { - return nil, errors.Errorf("imagesource requires snapshotter with blobs mapping support") - } - return is, nil } @@ -280,7 +272,7 @@ func (is *imageSource) fillBlobMapping(ctx context.Context, layers []rootfs.Laye for _, l := range layers { chain = append(chain, l.Diff.Digest) chainID := identity.ChainID(chain) - if err := is.SourceOpt.Snapshotter.(blobmapper).SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil { + if err := is.SourceOpt.Snapshotter.SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil { return err } } diff --git a/worker/base/worker.go b/worker/base/worker.go index d938fd0a..c0959a17 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -10,7 +10,6 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/images" - ctdsnapshot "github.com/containerd/containerd/snapshots" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/cacheimport" "github.com/moby/buildkit/cache/instructioncache" @@ -24,7 +23,7 @@ import ( ociexporter "github.com/moby/buildkit/exporter/oci" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" - "github.com/moby/buildkit/snapshot/blobmapping" + "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver/llbop" "github.com/moby/buildkit/solver/pb" @@ -44,23 +43,22 @@ import ( // WorkerOpt is specific to a worker. // See also CommonOpt. type WorkerOpt struct { - ID string - Labels map[string]string - SessionManager *session.Manager - MetadataStore *metadata.Store - Executor executor.Executor - BaseSnapshotter ctdsnapshot.Snapshotter // not blobmapping one (FIXME: just require blobmapping snapshotter?) - ContentStore content.Store - Applier diff.Differ - Differ diff.Differ - ImageStore images.Store // optional + ID string + Labels map[string]string + SessionManager *session.Manager + MetadataStore *metadata.Store + Executor executor.Executor + Snapshotter snapshot.Snapshotter + ContentStore content.Store + Applier diff.Differ + Differ diff.Differ + ImageStore images.Store // optional } // Worker is a local worker instance with dedicated snapshotter, cache, and so on. // TODO: s/Worker/OpWorker/g ? type Worker struct { WorkerOpt - Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter CacheManager cache.Manager SourceManager *source.Manager cache instructioncache.InstructionCache @@ -73,17 +71,8 @@ type Worker struct { // NewWorker instantiates a local worker func NewWorker(opt WorkerOpt) (*Worker, error) { - bmSnapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{ - Content: opt.ContentStore, - Snapshotter: opt.BaseSnapshotter, - MetadataStore: opt.MetadataStore, - }) - if err != nil { - return nil, err - } - cm, err := cache.NewManager(cache.ManagerOpt{ - Snapshotter: bmSnapshotter, + Snapshotter: opt.Snapshotter, MetadataStore: opt.MetadataStore, }) if err != nil { @@ -101,7 +90,7 @@ func NewWorker(opt WorkerOpt) (*Worker, error) { } is, err := containerimage.NewSource(containerimage.SourceOpt{ - Snapshotter: bmSnapshotter, + Snapshotter: opt.Snapshotter, ContentStore: opt.ContentStore, SessionManager: opt.SessionManager, Applier: opt.Applier, @@ -146,7 +135,7 @@ func NewWorker(opt WorkerOpt) (*Worker, error) { exporters := map[string]exporter.Exporter{} iw, err := imageexporter.NewImageWriter(imageexporter.WriterOpt{ - Snapshotter: bmSnapshotter, + Snapshotter: opt.Snapshotter, ContentStore: opt.ContentStore, Differ: opt.Differ, }) @@ -193,14 +182,14 @@ func NewWorker(opt WorkerOpt) (*Worker, error) { exporters[client.ExporterDocker] = dockerExporter ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{ - Snapshotter: bmSnapshotter, + Snapshotter: opt.Snapshotter, ContentStore: opt.ContentStore, SessionManager: opt.SessionManager, Differ: opt.Differ, }) ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{ - Snapshotter: bmSnapshotter, + Snapshotter: opt.Snapshotter, ContentStore: opt.ContentStore, Applier: opt.Applier, CacheAccessor: cm, @@ -209,7 +198,6 @@ func NewWorker(opt WorkerOpt) (*Worker, error) { return &Worker{ WorkerOpt: opt, - Snapshotter: bmSnapshotter, CacheManager: cm, SourceManager: sm, cache: ic, diff --git a/worker/containerd/containerd.go b/worker/containerd/containerd.go index 86723e47..e9208560 100644 --- a/worker/containerd/containerd.go +++ b/worker/containerd/containerd.go @@ -5,13 +5,15 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/snapshots" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/containerdexecutor" "github.com/moby/buildkit/identity" - "github.com/moby/buildkit/snapshot/nogc" + containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" "github.com/moby/buildkit/worker/base" "github.com/pkg/errors" ) @@ -55,10 +57,13 @@ func newContainerd(root string, client *containerd.Client, snapshotterName strin } gc := func(ctx context.Context) error { + // TODO: how to avoid this? snapshotter := client.SnapshotService(snapshotterName) ctx = namespaces.WithNamespace(ctx, "buildkit") key := identity.NewID() - if _, err := snapshotter.Prepare(ctx, key, ""); err != nil { + if _, err := snapshotter.Prepare(ctx, key, "", snapshots.WithLabels(map[string]string{ + "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), + })); err != nil { return err } if err := snapshotter.Remove(ctx, key); err != nil { @@ -67,16 +72,18 @@ func newContainerd(root string, client *containerd.Client, snapshotterName strin return nil } + cs := containerdsnapshot.NewContentStore(client.ContentStore(), "buildkit", gc) + opt := base.WorkerOpt{ - ID: id, - Labels: xlabels, - MetadataStore: md, - Executor: containerdexecutor.New(client, root), - BaseSnapshotter: nogc.NewSnapshotter(client.SnapshotService(snapshotterName), "buildkit", gc), - ContentStore: nogc.NewContentStore(client.ContentStore(), "buildkit", gc), - Applier: df, - Differ: df, - ImageStore: client.ImageService(), + ID: id, + Labels: xlabels, + MetadataStore: md, + Executor: containerdexecutor.New(client, root), + Snapshotter: containerdsnapshot.NewSnapshotter(client.SnapshotService(snapshotterName), cs, md, "buildkit", gc), + ContentStore: cs, + Applier: df, + Differ: df, + ImageStore: client.ImageService(), } return opt, nil } diff --git a/worker/runc/runc.go b/worker/runc/runc.go index 983a7e78..20dada32 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -13,7 +13,7 @@ import ( "github.com/containerd/containerd/snapshots/overlay" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/runcexecutor" - "github.com/moby/buildkit/snapshot/nogc" + containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" "github.com/moby/buildkit/worker/base" ) @@ -62,7 +62,7 @@ func NewWorkerOpt(root string, labels map[string]string) (base.WorkerOpt, error) return err } - c = nogc.NewContentStore(mdb.ContentStore(), "buildkit", gc) + c = containerdsnapshot.NewContentStore(mdb.ContentStore(), "buildkit", gc) df, err := walking.NewWalkingDiff(c) if err != nil { return opt, err @@ -77,15 +77,15 @@ func NewWorkerOpt(root string, labels map[string]string) (base.WorkerOpt, error) xlabels[k] = v } opt = base.WorkerOpt{ - ID: id, - Labels: xlabels, - MetadataStore: md, - Executor: exe, - BaseSnapshotter: nogc.NewSnapshotter(mdb.Snapshotter("overlayfs"), "buildkit", gc), - ContentStore: c, - Applier: df, - Differ: df, - ImageStore: nil, // explicitly + ID: id, + Labels: xlabels, + MetadataStore: md, + Executor: exe, + Snapshotter: containerdsnapshot.NewSnapshotter(mdb.Snapshotter("overlayfs"), c, md, "buildkit", gc), + ContentStore: c, + Applier: df, + Differ: df, + ImageStore: nil, // explicitly } return opt, nil } From 00a5729229e3b698540575490f61a97a3301d8cd Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Thu, 28 Dec 2017 11:21:41 -0800 Subject: [PATCH 5/8] pull: fix content labels after pull Signed-off-by: Tonis Tiigi --- source/containerimage/pull.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index 9f18771e..ac995dfb 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -228,6 +228,40 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { } } + // split all pulled data to layers and rest. layers remain roots and are deleted with snapshots. rest will be linked to layers. + var notLayerBlobs []ocispec.Descriptor + var layerBlobs []ocispec.Descriptor + for _, j := range ongoing.added { + switch j.MediaType { + case ocispec.MediaTypeImageLayer, images.MediaTypeDockerSchema2Layer, ocispec.MediaTypeImageLayerGzip, images.MediaTypeDockerSchema2LayerGzip: + layerBlobs = append(layerBlobs, j.Descriptor) + default: + notLayerBlobs = append(notLayerBlobs, j.Descriptor) + } + } + + for _, l := range layerBlobs { + labels := map[string]string{} + var fields []string + for _, nl := range notLayerBlobs { + k := "containerd.io/gc.ref.content." + nl.Digest.Hex()[:12] + labels[k] = nl.Digest.String() + fields = append(fields, "labels."+k) + } + if _, err := p.is.ContentStore.Update(ctx, content.Info{ + Digest: l.Digest, + Labels: labels, + }, fields...); err != nil { + return nil, err + } + } + + for _, nl := range notLayerBlobs { + if err := p.is.ContentStore.Delete(ctx, nl.Digest); err != nil { + return nil, err + } + } + unpackProgressDone := oneOffProgress(ctx, "unpacking "+p.src.Reference.String()) chainid, err := p.is.unpack(ctx, p.desc) if err != nil { From ccedd87e63d50fed386e784cee1132148899c614 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Thu, 28 Dec 2017 11:56:04 -0800 Subject: [PATCH 6/8] exporter: set proper labels on exported images Signed-off-by: Tonis Tiigi --- exporter/containerimage/export.go | 4 ++++ exporter/containerimage/writer.go | 34 +++++++++++++++++++------------ exporter/oci/export.go | 3 +++ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/exporter/containerimage/export.go b/exporter/containerimage/export.go index cf4454a1..7e493c41 100644 --- a/exporter/containerimage/export.go +++ b/exporter/containerimage/export.go @@ -75,6 +75,10 @@ func (e *imageExporterInstance) Export(ctx context.Context, ref cache.ImmutableR return err } + defer func() { + e.opt.ImageWriter.ContentStore().Delete(context.TODO(), desc.Digest) + }() + if e.targetName != "" { if e.opt.Images != nil { tagDone := oneOffProgress(ctx, "naming to "+e.targetName) diff --git a/exporter/containerimage/writer.go b/exporter/containerimage/writer.go index 0f338d39..87c6530a 100644 --- a/exporter/containerimage/writer.go +++ b/exporter/containerimage/writer.go @@ -3,6 +3,7 @@ package containerimage import ( "bytes" "encoding/json" + "fmt" "runtime" "strings" "time" @@ -68,17 +69,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, ref cache.ImmutableRef, confi return nil, err } - addAsRoot := content.WithLabels(map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - }) - configDigest := digest.FromBytes(config) - configDone := oneOffProgress(ctx, "exporting config "+configDigest.String()) - - if err := content.WriteBlob(ctx, ic.opt.ContentStore, configDigest.String(), bytes.NewReader(config), int64(len(config)), configDigest, addAsRoot); err != nil { - return nil, configDone(errors.Wrap(err, "error writing config blob")) - } - configDone(nil) mfst := schema2.Manifest{ Config: distribution.Descriptor{ @@ -90,16 +81,21 @@ func (ic *ImageWriter) Commit(ctx context.Context, ref cache.ImmutableRef, confi mfst.SchemaVersion = 2 mfst.MediaType = schema2.MediaTypeManifest - for _, dp := range diffPairs { + labels := map[string]string{ + "containerd.io/gc.ref.content.0": configDigest.String(), + } + + for i, dp := range diffPairs { info, err := ic.opt.ContentStore.Info(ctx, dp.Blobsum) if err != nil { - return nil, configDone(errors.Wrapf(err, "could not find blob %s from contentstore", dp.Blobsum)) + return nil, errors.Wrapf(err, "could not find blob %s from contentstore", dp.Blobsum) } mfst.Layers = append(mfst.Layers, distribution.Descriptor{ Digest: dp.Blobsum, Size: info.Size, MediaType: schema2.MediaTypeLayer, }) + labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = dp.Blobsum.String() } mfstJSON, err := json.Marshal(mfst) @@ -110,11 +106,23 @@ func (ic *ImageWriter) Commit(ctx context.Context, ref cache.ImmutableRef, confi mfstDigest := digest.FromBytes(mfstJSON) mfstDone := oneOffProgress(ctx, "exporting manifest "+mfstDigest.String()) - if err := content.WriteBlob(ctx, ic.opt.ContentStore, mfstDigest.String(), bytes.NewReader(mfstJSON), int64(len(mfstJSON)), mfstDigest, addAsRoot); err != nil { + if err := content.WriteBlob(ctx, ic.opt.ContentStore, mfstDigest.String(), bytes.NewReader(mfstJSON), int64(len(mfstJSON)), mfstDigest, content.WithLabels(labels)); err != nil { return nil, mfstDone(errors.Wrapf(err, "error writing manifest blob %s", mfstDigest)) } mfstDone(nil) + configDone := oneOffProgress(ctx, "exporting config "+configDigest.String()) + + if err := content.WriteBlob(ctx, ic.opt.ContentStore, configDigest.String(), bytes.NewReader(config), int64(len(config)), configDigest); err != nil { + return nil, configDone(errors.Wrap(err, "error writing config blob")) + } + configDone(nil) + + // delete config root. config will remain linked to the manifest + if err := ic.opt.ContentStore.Delete(context.TODO(), configDigest); err != nil { + return nil, errors.Wrap(err, "error removing config root") + } + return &ocispec.Descriptor{ Digest: mfstDigest, Size: int64(len(mfstJSON)), diff --git a/exporter/oci/export.go b/exporter/oci/export.go index ac18170c..8da2b30e 100644 --- a/exporter/oci/export.go +++ b/exporter/oci/export.go @@ -93,6 +93,9 @@ func (e *imageExporterInstance) Export(ctx context.Context, ref cache.ImmutableR if err != nil { return err } + defer func() { + e.opt.ImageWriter.ContentStore().Delete(context.TODO(), desc.Digest) + }() if desc.Annotations == nil { desc.Annotations = map[string]string{} } From b88a98bcade4efbbd4eaa5ca493a7fcab0bc4111 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Thu, 28 Dec 2017 15:03:49 -0800 Subject: [PATCH 7/8] client: add integration tests for prune Signed-off-by: Tonis Tiigi --- cache/refs.go | 2 +- client/client_test.go | 87 ++++++++++++++++++++++++++++++- cmd/buildctl/buildctl_test.go | 1 + cmd/buildctl/prune_test.go | 15 ++++++ exporter/containerimage/writer.go | 6 ++- 5 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 cmd/buildctl/prune_test.go diff --git a/cache/refs.go b/cache/refs.go index 0cf429fd..37b04302 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -116,7 +116,7 @@ func (cr *cacheRecord) Parent() ImmutableRef { } p := cr.parent.(*immutableRef) p.mu.Lock() - p.mu.Unlock() + defer p.mu.Unlock() return p.ref() } diff --git a/client/client_test.go b/client/client_test.go index ad5ba779..bf1fad16 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -18,7 +18,9 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/snapshots" "github.com/docker/distribution/manifest/schema2" "github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/identity" @@ -68,6 +70,8 @@ func testBuildMultiMount(t *testing.T, sb integration.Sandbox) { err = c.Solve(context.TODO(), def, SolveOpt{}, nil) require.NoError(t, err) + + checkAllReleasable(t, c, sb, true) } func testBuildHTTPSource(t *testing.T, sb integration.Sandbox) { @@ -157,6 +161,8 @@ func testBuildHTTPSource(t *testing.T, sb integration.Sandbox) { require.Equal(t, fi.ModTime().Format(http.TimeFormat), modTime.Format(http.TimeFormat)) require.Equal(t, int(fi.Mode()&0777), 0741) + checkAllReleasable(t, c, sb, true) + // TODO: check that second request was marked as cached } @@ -253,6 +259,8 @@ func testUser(t *testing.T, sb integration.Sandbox) { dt, err = ioutil.ReadFile(filepath.Join(destDir, "userone")) require.NoError(t, err) require.Contains(t, string(dt), "1") + + checkAllReleasable(t, c, sb, true) } func testOCIExporter(t *testing.T, sb integration.Sandbox) { @@ -347,8 +355,9 @@ func testOCIExporter(t *testing.T, sb integration.Sandbox) { _, ok := m[l] require.True(t, ok) } - } + + checkAllReleasable(t, c, sb, true) } func testBuildPushAndValidate(t *testing.T, sb integration.Sandbox) { @@ -419,6 +428,8 @@ func testBuildPushAndValidate(t *testing.T, sb integration.Sandbox) { require.NoError(t, err) require.Equal(t, 0741, int(fi.Mode()&0777)) + checkAllReleasable(t, c, sb, false) + // examine contents of exported tars (requires containerd) var cdAddress string if cd, ok := sb.(interface { @@ -437,6 +448,16 @@ func testBuildPushAndValidate(t *testing.T, sb integration.Sandbox) { ctx := namespaces.WithNamespace(context.Background(), "buildkit") + // check image in containerd + _, err = client.ImageService().Get(ctx, target) + require.NoError(t, err) + + // deleting image should release all content + err = client.ImageService().Delete(ctx, target, images.SynchronousDelete()) + require.NoError(t, err) + + checkAllReleasable(t, c, sb, true) + img, err := client.Pull(ctx, target) require.NoError(t, err) @@ -563,3 +584,67 @@ func readTarToMap(dt []byte, compressed bool) (map[string]*tarItem, error) { m[h.Name] = &tarItem{header: h, data: dt} } } + +func checkAllReleasable(t *testing.T, c *Client, sb integration.Sandbox, checkContent bool) { + err := c.Prune(context.TODO(), nil) + require.NoError(t, err) + + du, err := c.DiskUsage(context.TODO()) + require.NoError(t, err) + require.Equal(t, 0, len(du)) + + // examine contents of exported tars (requires containerd) + var cdAddress string + if cd, ok := sb.(interface { + ContainerdAddress() string + }); !ok { + return + } else { + cdAddress = cd.ContainerdAddress() + } + + // TODO: make public pull helper function so this can be checked for standalone as well + + client, err := containerd.New(cdAddress) + require.NoError(t, err) + defer client.Close() + + ctx := namespaces.WithNamespace(context.Background(), "buildkit") + snapshotService := client.SnapshotService("overlayfs") + + retries := 0 + for { + count := 0 + err = snapshotService.Walk(ctx, func(context.Context, snapshots.Info) error { + count++ + return nil + }) + require.NoError(t, err) + if count == 0 { + break + } + require.True(t, 20 > retries) + retries++ + time.Sleep(500 * time.Millisecond) + } + + if !checkContent { + return + } + + retries = 0 + for { + count := 0 + err = client.ContentStore().Walk(ctx, func(content.Info) error { + count++ + return nil + }) + require.NoError(t, err) + if count == 0 { + break + } + require.True(t, 20 > retries) + retries++ + time.Sleep(500 * time.Millisecond) + } +} diff --git a/cmd/buildctl/buildctl_test.go b/cmd/buildctl/buildctl_test.go index c166ec88..f7073fec 100644 --- a/cmd/buildctl/buildctl_test.go +++ b/cmd/buildctl/buildctl_test.go @@ -12,5 +12,6 @@ func TestCLIIntegration(t *testing.T) { testBuildWithLocalFiles, testBuildLocalExporter, testBuildContainerdExporter, + testPrune, }) } diff --git a/cmd/buildctl/prune_test.go b/cmd/buildctl/prune_test.go new file mode 100644 index 00000000..9924f3c7 --- /dev/null +++ b/cmd/buildctl/prune_test.go @@ -0,0 +1,15 @@ +package main + +import ( + "testing" + + "github.com/moby/buildkit/util/testutil/integration" + "github.com/stretchr/testify/assert" +) + +func testPrune(t *testing.T, sb integration.Sandbox) { + t.Parallel() + cmd := sb.Cmd("prune") + err := cmd.Run() + assert.NoError(t, err) +} diff --git a/exporter/containerimage/writer.go b/exporter/containerimage/writer.go index 87c6530a..f156b824 100644 --- a/exporter/containerimage/writer.go +++ b/exporter/containerimage/writer.go @@ -248,7 +248,11 @@ func getRefDesciptions(ref cache.ImmutableRef, limit int) []string { if descr == "" { descr = defaultMsg } - return append(getRefDesciptions(ref.Parent(), limit-1), descr) + p := ref.Parent() + if p != nil { + defer p.Release(context.TODO()) + } + return append(getRefDesciptions(p, limit-1), descr) } func oneOffProgress(ctx context.Context, id string) func(err error) error { From 1c9b9c9959adaa9cb98dfe2aca6ce5a4242e073e Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Thu, 28 Dec 2017 15:09:07 -0800 Subject: [PATCH 8/8] cache: add global prune lock Signed-off-by: Tonis Tiigi --- cache/manager.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cache/manager.go b/cache/manager.go index eaf67067..a6e4cc6c 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -50,6 +50,8 @@ type cacheManager struct { mu sync.Mutex ManagerOpt md *metadata.Store + + muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results } func NewManager(opt ManagerOpt) (Manager, error) { @@ -283,8 +285,12 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, } func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo) error { - // TODO: global prune lock + cm.muPrune.Lock() + defer cm.muPrune.Unlock() + return cm.prune(ctx, ch) +} +func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo) error { var toDelete []*cacheRecord cm.mu.Lock() @@ -375,7 +381,7 @@ func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo) err case <-ctx.Done(): return ctx.Err() default: - return cm.Prune(ctx, ch) + return cm.prune(ctx, ch) } }