Merge pull request #602 from tonistiigi/gc

automatic GC
docker-18.09
Akihiro Suda 2018-09-05 10:47:21 +09:00 committed by GitHub
commit 756ca6cc6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 618 additions and 30 deletions

View File

@ -9,6 +9,7 @@
It has these top-level messages:
WorkerRecord
GCPolicy
*/
package moby_buildkit_v1_types
@ -35,6 +36,7 @@ type WorkerRecord struct {
ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"`
Labels map[string]string `protobuf:"bytes,2,rep,name=Labels" json:"Labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Platforms []pb.Platform `protobuf:"bytes,3,rep,name=platforms" json:"platforms"`
GCPolicy []*GCPolicy `protobuf:"bytes,4,rep,name=GCPolicy" json:"GCPolicy,omitempty"`
}
func (m *WorkerRecord) Reset() { *m = WorkerRecord{} }
@ -63,8 +65,56 @@ func (m *WorkerRecord) GetPlatforms() []pb.Platform {
return nil
}
func (m *WorkerRecord) GetGCPolicy() []*GCPolicy {
if m != nil {
return m.GCPolicy
}
return nil
}
type GCPolicy struct {
All bool `protobuf:"varint,1,opt,name=all,proto3" json:"all,omitempty"`
KeepDuration int64 `protobuf:"varint,2,opt,name=keepDuration,proto3" json:"keepDuration,omitempty"`
KeepBytes int64 `protobuf:"varint,3,opt,name=keepBytes,proto3" json:"keepBytes,omitempty"`
Filters []string `protobuf:"bytes,4,rep,name=filters" json:"filters,omitempty"`
}
func (m *GCPolicy) Reset() { *m = GCPolicy{} }
func (m *GCPolicy) String() string { return proto.CompactTextString(m) }
func (*GCPolicy) ProtoMessage() {}
func (*GCPolicy) Descriptor() ([]byte, []int) { return fileDescriptorWorker, []int{1} }
func (m *GCPolicy) GetAll() bool {
if m != nil {
return m.All
}
return false
}
func (m *GCPolicy) GetKeepDuration() int64 {
if m != nil {
return m.KeepDuration
}
return 0
}
func (m *GCPolicy) GetKeepBytes() int64 {
if m != nil {
return m.KeepBytes
}
return 0
}
func (m *GCPolicy) GetFilters() []string {
if m != nil {
return m.Filters
}
return nil
}
func init() {
proto.RegisterType((*WorkerRecord)(nil), "moby.buildkit.v1.types.WorkerRecord")
proto.RegisterType((*GCPolicy)(nil), "moby.buildkit.v1.types.GCPolicy")
}
func (m *WorkerRecord) Marshal() (dAtA []byte, err error) {
size := m.Size()
@ -116,6 +166,71 @@ func (m *WorkerRecord) MarshalTo(dAtA []byte) (int, error) {
i += n
}
}
if len(m.GCPolicy) > 0 {
for _, msg := range m.GCPolicy {
dAtA[i] = 0x22
i++
i = encodeVarintWorker(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *GCPolicy) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GCPolicy) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.All {
dAtA[i] = 0x8
i++
if m.All {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i++
}
if m.KeepDuration != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintWorker(dAtA, i, uint64(m.KeepDuration))
}
if m.KeepBytes != 0 {
dAtA[i] = 0x18
i++
i = encodeVarintWorker(dAtA, i, uint64(m.KeepBytes))
}
if len(m.Filters) > 0 {
for _, s := range m.Filters {
dAtA[i] = 0x22
i++
l = len(s)
for l >= 1<<7 {
dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
l >>= 7
i++
}
dAtA[i] = uint8(l)
i++
i += copy(dAtA[i:], s)
}
}
return i, nil
}
@ -149,6 +264,33 @@ func (m *WorkerRecord) Size() (n int) {
n += 1 + l + sovWorker(uint64(l))
}
}
if len(m.GCPolicy) > 0 {
for _, e := range m.GCPolicy {
l = e.Size()
n += 1 + l + sovWorker(uint64(l))
}
}
return n
}
func (m *GCPolicy) Size() (n int) {
var l int
_ = l
if m.All {
n += 2
}
if m.KeepDuration != 0 {
n += 1 + sovWorker(uint64(m.KeepDuration))
}
if m.KeepBytes != 0 {
n += 1 + sovWorker(uint64(m.KeepBytes))
}
if len(m.Filters) > 0 {
for _, s := range m.Filters {
l = len(s)
n += 1 + l + sovWorker(uint64(l))
}
}
return n
}
@ -372,6 +514,174 @@ func (m *WorkerRecord) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field GCPolicy", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWorker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthWorker
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.GCPolicy = append(m.GCPolicy, &GCPolicy{})
if err := m.GCPolicy[len(m.GCPolicy)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWorker(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthWorker
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *GCPolicy) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWorker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GCPolicy: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GCPolicy: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field All", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWorker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.All = bool(v != 0)
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field KeepDuration", wireType)
}
m.KeepDuration = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWorker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.KeepDuration |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field KeepBytes", wireType)
}
m.KeepBytes = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWorker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.KeepBytes |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Filters", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWorker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthWorker
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Filters = append(m.Filters, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWorker(dAtA[iNdEx:])
@ -501,23 +811,28 @@ var (
func init() { proto.RegisterFile("worker.proto", fileDescriptorWorker) }
var fileDescriptorWorker = []byte{
// 273 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x8f, 0x41, 0x4b, 0xf3, 0x40,
0x10, 0x86, 0xbf, 0x4d, 0x3e, 0x0b, 0xdd, 0x06, 0x91, 0x45, 0x24, 0xe4, 0x10, 0x8b, 0xa7, 0x1e,
0x74, 0xb6, 0xea, 0x45, 0x3d, 0x96, 0x0a, 0x16, 0x3c, 0x48, 0x2e, 0x9e, 0xb3, 0xed, 0x36, 0x86,
0x24, 0xce, 0xb2, 0xd9, 0x44, 0xf2, 0x0f, 0x7b, 0xf4, 0xe2, 0x55, 0x24, 0xbf, 0x44, 0xba, 0x89,
0x98, 0x83, 0xb7, 0x79, 0x87, 0x67, 0x1e, 0xde, 0xa1, 0xde, 0x1b, 0xea, 0x4c, 0x6a, 0x50, 0x1a,
0x0d, 0xb2, 0x93, 0x02, 0x45, 0x03, 0xa2, 0x4a, 0xf3, 0x4d, 0x96, 0x1a, 0xa8, 0x2f, 0xc1, 0x34,
0x4a, 0x96, 0xc1, 0x45, 0x92, 0x9a, 0x97, 0x4a, 0xc0, 0x1a, 0x0b, 0x9e, 0x60, 0x82, 0xdc, 0xe2,
0xa2, 0xda, 0xda, 0x64, 0x83, 0x9d, 0x3a, 0x4d, 0x70, 0x3e, 0xc0, 0xf7, 0x46, 0xfe, 0x63, 0xe4,
0x25, 0xe6, 0xb5, 0xd4, 0x5c, 0x09, 0x8e, 0xaa, 0xec, 0xe8, 0xb3, 0x0f, 0x42, 0xbd, 0x67, 0xdb,
0x22, 0x92, 0x6b, 0xd4, 0x1b, 0x76, 0x48, 0x9d, 0xd5, 0xd2, 0x27, 0x53, 0x32, 0x1b, 0x47, 0xce,
0x6a, 0xc9, 0x1e, 0xe8, 0xe8, 0x31, 0x16, 0x32, 0x2f, 0x7d, 0x67, 0xea, 0xce, 0x26, 0x57, 0x73,
0xf8, 0xbb, 0x26, 0x0c, 0x2d, 0xd0, 0x9d, 0xdc, 0xbf, 0x1a, 0xdd, 0x44, 0xfd, 0x3d, 0x9b, 0xd3,
0xb1, 0xca, 0x63, 0xb3, 0x45, 0x5d, 0x94, 0xbe, 0x6b, 0x65, 0x1e, 0x28, 0x01, 0x4f, 0xfd, 0x72,
0xf1, 0x7f, 0xf7, 0x79, 0xfa, 0x2f, 0xfa, 0x85, 0x82, 0x5b, 0x3a, 0x19, 0x88, 0xd8, 0x11, 0x75,
0x33, 0xd9, 0xf4, 0xdd, 0xf6, 0x23, 0x3b, 0xa6, 0x07, 0x75, 0x9c, 0x57, 0xd2, 0x77, 0xec, 0xae,
0x0b, 0x77, 0xce, 0x0d, 0x59, 0x78, 0xbb, 0x36, 0x24, 0xef, 0x6d, 0x48, 0xbe, 0xda, 0x90, 0x88,
0x91, 0x7d, 0xf6, 0xfa, 0x3b, 0x00, 0x00, 0xff, 0xff, 0xa9, 0x5c, 0x8f, 0x26, 0x71, 0x01, 0x00,
0x00,
// 355 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x91, 0xc1, 0x4e, 0xea, 0x40,
0x14, 0x86, 0x6f, 0x5b, 0x2e, 0x97, 0x0e, 0xcd, 0x8d, 0x99, 0x18, 0xd3, 0x10, 0x83, 0x84, 0x15,
0x0b, 0x9d, 0xa2, 0x6e, 0xd4, 0xb8, 0x42, 0x8c, 0x92, 0xb8, 0x20, 0xb3, 0x71, 0xdd, 0x81, 0x01,
0x9b, 0x0e, 0x9c, 0xc9, 0x74, 0x8a, 0xf6, 0x39, 0x7c, 0x29, 0x96, 0x3e, 0x81, 0x31, 0x3c, 0x89,
0x99, 0x29, 0x08, 0x26, 0xba, 0x3b, 0xff, 0x9f, 0xff, 0xfb, 0xe7, 0x9c, 0x0c, 0x0a, 0x9e, 0x41,
0xa5, 0x5c, 0x11, 0xa9, 0x40, 0x03, 0x3e, 0x98, 0x01, 0x2b, 0x08, 0xcb, 0x13, 0x31, 0x4e, 0x13,
0x4d, 0x16, 0xa7, 0x44, 0x17, 0x92, 0x67, 0x8d, 0x93, 0x69, 0xa2, 0x9f, 0x72, 0x46, 0x46, 0x30,
0x8b, 0xa6, 0x30, 0x85, 0xc8, 0xc6, 0x59, 0x3e, 0xb1, 0xca, 0x0a, 0x3b, 0x95, 0x35, 0x8d, 0xe3,
0x9d, 0xb8, 0x69, 0x8c, 0x36, 0x8d, 0x51, 0x06, 0x62, 0xc1, 0x55, 0x24, 0x59, 0x04, 0x32, 0x2b,
0xd3, 0xed, 0x57, 0x17, 0x05, 0x8f, 0x76, 0x0b, 0xca, 0x47, 0xa0, 0xc6, 0xf8, 0x3f, 0x72, 0x07,
0xfd, 0xd0, 0x69, 0x39, 0x1d, 0x9f, 0xba, 0x83, 0x3e, 0xbe, 0x47, 0xd5, 0x87, 0x98, 0x71, 0x91,
0x85, 0x6e, 0xcb, 0xeb, 0xd4, 0xcf, 0xba, 0xe4, 0xe7, 0x35, 0xc9, 0x6e, 0x0b, 0x29, 0x91, 0xdb,
0xb9, 0x56, 0x05, 0x5d, 0xf3, 0xb8, 0x8b, 0x7c, 0x29, 0x62, 0x3d, 0x01, 0x35, 0xcb, 0x42, 0xcf,
0x96, 0x05, 0x44, 0x32, 0x32, 0x5c, 0x9b, 0xbd, 0xca, 0xf2, 0xfd, 0xe8, 0x0f, 0xdd, 0x86, 0xf0,
0x35, 0xaa, 0xdd, 0xdd, 0x0c, 0x41, 0x24, 0xa3, 0x22, 0xac, 0x58, 0xa0, 0xf5, 0xdb, 0xeb, 0x9b,
0x1c, 0xfd, 0x22, 0x1a, 0x97, 0xa8, 0xbe, 0xb3, 0x06, 0xde, 0x43, 0x5e, 0xca, 0x8b, 0xf5, 0x65,
0x66, 0xc4, 0xfb, 0xe8, 0xef, 0x22, 0x16, 0x39, 0x0f, 0x5d, 0xeb, 0x95, 0xe2, 0xca, 0xbd, 0x70,
0xda, 0x2f, 0xdb, 0x87, 0x0d, 0x17, 0x0b, 0x61, 0xb9, 0x1a, 0x35, 0x23, 0x6e, 0xa3, 0x20, 0xe5,
0x5c, 0xf6, 0x73, 0x15, 0xeb, 0x04, 0xe6, 0x16, 0xf7, 0xe8, 0x37, 0x0f, 0x1f, 0x22, 0xdf, 0xe8,
0x5e, 0xa1, 0xb9, 0x39, 0xd6, 0x04, 0xb6, 0x06, 0x0e, 0xd1, 0xbf, 0x49, 0x22, 0x34, 0x57, 0x99,
0xbd, 0xcb, 0xa7, 0x1b, 0xd9, 0x0b, 0x96, 0xab, 0xa6, 0xf3, 0xb6, 0x6a, 0x3a, 0x1f, 0xab, 0xa6,
0xc3, 0xaa, 0xf6, 0x93, 0xce, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfc, 0x79, 0x52, 0x6a, 0x29,
0x02, 0x00, 0x00,
}

View File

@ -13,4 +13,12 @@ message WorkerRecord {
string ID = 1;
map<string, string> Labels = 2;
repeated pb.Platform platforms = 3 [(gogoproto.nullable) = false];
repeated GCPolicy GCPolicy = 4;
}
message GCPolicy {
bool all = 1;
int64 keepDuration = 2;
int64 keepBytes = 3;
repeated string filters = 4;
}

13
cache/manager.go vendored
View File

@ -39,7 +39,7 @@ type Accessor interface {
type Controller interface {
DiskUsage(ctx context.Context, info client.DiskUsageInfo) ([]*client.UsageInfo, error)
Prune(ctx context.Context, ch chan client.UsageInfo, info client.PruneInfo) error
Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error
GC(ctx context.Context) error
}
@ -304,10 +304,19 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef,
return rec.mref(), nil
}
func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opt client.PruneInfo) error {
func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {
cm.muPrune.Lock()
defer cm.muPrune.Unlock()
for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
return err
}
}
return nil
}
func (cm *cacheManager) pruneOnce(ctx context.Context, ch chan client.UsageInfo, opt client.PruneInfo) error {
filter, err := filters.ParseAll(opt.Filter...)
if err != nil {
return err

View File

@ -2,8 +2,10 @@ package client
import (
"context"
"time"
controlapi "github.com/moby/buildkit/api/services/control"
apitypes "github.com/moby/buildkit/api/types"
"github.com/moby/buildkit/solver/pb"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
@ -13,6 +15,7 @@ type WorkerInfo struct {
ID string
Labels map[string]string
Platforms []specs.Platform
GCPolicy []PruneInfo
}
func (c *Client) ListWorkers(ctx context.Context, opts ...ListWorkersOption) ([]*WorkerInfo, error) {
@ -34,6 +37,7 @@ func (c *Client) ListWorkers(ctx context.Context, opts ...ListWorkersOption) ([]
ID: w.ID,
Labels: w.Labels,
Platforms: pb.ToSpecPlatforms(w.Platforms),
GCPolicy: fromAPIGCPolicy(w.GCPolicy),
})
}
@ -47,3 +51,16 @@ type ListWorkersOption interface {
type ListWorkersInfo struct {
Filter []string
}
func fromAPIGCPolicy(in []*apitypes.GCPolicy) []PruneInfo {
out := make([]PruneInfo, 0, len(in))
for _, p := range in {
out = append(out, PruneInfo{
All: p.All,
Filter: p.Filters,
KeepDuration: time.Duration(p.KeepDuration),
KeepBytes: p.KeepBytes,
})
}
return out
}

View File

@ -11,6 +11,7 @@ import (
"github.com/containerd/containerd/platforms"
"github.com/moby/buildkit/client"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/tonistiigi/units"
"github.com/urfave/cli"
)
@ -63,6 +64,19 @@ func printWorkersVerbose(tw *tabwriter.Writer, winfo []*client.WorkerInfo) {
v := wi.Labels[k]
fmt.Fprintf(tw, "\t%s:\t%s\n", k, v)
}
for i, rule := range wi.GCPolicy {
fmt.Fprintf(tw, "GC Policy rule#%d:\n", i)
fmt.Fprintf(tw, "\tAll:\t%v\n", rule.All)
if len(rule.Filter) > 0 {
fmt.Fprintf(tw, "\tFilters:\t%s\n", strings.Join(rule.Filter, " "))
}
if rule.KeepDuration > 0 {
fmt.Fprintf(tw, "\tKeep Duration:\t%v\n", rule.KeepDuration.String())
}
if rule.KeepBytes > 0 {
fmt.Fprintf(tw, "\tKeep Bytes:\t%g\n", units.Bytes(rule.KeepBytes))
}
}
fmt.Fprintf(tw, "\n")
}

View File

@ -47,6 +47,7 @@ type OCIConfig struct {
Platforms []string `toml:"platforms"`
Snapshotter string `toml:"snapshotter"`
Rootless bool `toml:"rootless"`
GCPolicy []GCPolicy `toml:"gcpolicy"`
}
type ContainerdConfig struct {
@ -54,6 +55,14 @@ type ContainerdConfig struct {
Enabled *bool `toml:"enabled"`
Labels map[string]string `toml:"labels"`
Platforms []string `toml:"platforms"`
GCPolicy []GCPolicy `toml:"gcpolicy"`
}
type GCPolicy struct {
All bool `toml:"all"`
KeepBytes int64 `toml:"keepBytes"`
KeepDuration int64 `toml:"keepDuration"`
Filters []string `toml:"filters"`
}
func Load(r io.Reader) (Config, *toml.MetaData, error) {

View File

@ -31,6 +31,14 @@ foo="bar"
[worker.containerd]
platforms=["linux/amd64"]
address="containerd.sock"
[[worker.containerd.gcpolicy]]
all=true
filters=["foo==bar"]
keepBytes=20
keepDuration=3600
[[worker.containerd.gcpolicy]]
keepBytes=40
keepDuration=7200
`
cfg, md, err := Load(bytes.NewBuffer([]byte(testConfig)))
@ -58,4 +66,16 @@ address="containerd.sock"
require.Nil(t, cfg.Workers.Containerd.Enabled)
require.Equal(t, 1, len(cfg.Workers.Containerd.Platforms))
require.Equal(t, "containerd.sock", cfg.Workers.Containerd.Address)
require.Equal(t, 0, len(cfg.Workers.OCI.GCPolicy))
require.Equal(t, 2, len(cfg.Workers.Containerd.GCPolicy))
require.Equal(t, true, cfg.Workers.Containerd.GCPolicy[0].All)
require.Equal(t, false, cfg.Workers.Containerd.GCPolicy[1].All)
require.Equal(t, int64(20), cfg.Workers.Containerd.GCPolicy[0].KeepBytes)
require.Equal(t, int64(40), cfg.Workers.Containerd.GCPolicy[1].KeepBytes)
require.Equal(t, int64(3600), cfg.Workers.Containerd.GCPolicy[0].KeepDuration)
require.Equal(t, int64(7200), cfg.Workers.Containerd.GCPolicy[1].KeepDuration)
require.Equal(t, 1, len(cfg.Workers.Containerd.GCPolicy[0].Filters))
require.Equal(t, 0, len(cfg.Workers.Containerd.GCPolicy[1].Filters))
}

View File

@ -0,0 +1,29 @@
package config
const defaultCap int64 = 2e9 // 2GB
func DefaultGCPolicy(p string) []GCPolicy {
keep := detectDefaultGCCap(p)
return []GCPolicy{
// if build cache uses more than 512MB delete the most easily reproducible data after it has not been used for 2 days
{
Filters: []string{"type==source.local,type==exec.cachemount,type==source.git.checkout"},
KeepDuration: 48 * 3600, // 48h
KeepBytes: 512 * 1e6, // 512MB
},
// remove any data not used for 60 days
{
KeepDuration: 60 * 24 * 3600, // 60d
KeepBytes: keep,
},
// keep the unshared build cache under cap
{
KeepBytes: keep,
},
// if previous policies were insufficient start deleting internal data to keep build cache under cap
{
All: true,
KeepBytes: keep,
},
}
}

View File

@ -0,0 +1,17 @@
// +build !windows
package config
import (
"syscall"
)
func detectDefaultGCCap(root string) int64 {
var st syscall.Statfs_t
if err := syscall.Statfs(root, &st); err != nil {
return defaultCap
}
diskSize := st.Bsize * int64(st.Blocks)
avail := diskSize / 10
return (avail/(1<<30) + 1) * 1e9 // round up
}

View File

@ -0,0 +1,7 @@
// +build windows
package config
func detectDefaultGCCap(root string) int64 {
return defaultCap
}

View File

@ -13,6 +13,7 @@ import (
"sort"
"strconv"
"strings"
"time"
"github.com/BurntSushi/toml"
"github.com/containerd/containerd/pkg/seed"
@ -21,6 +22,7 @@ import (
"github.com/docker/go-connections/sockets"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
registryremotecache "github.com/moby/buildkit/cache/remotecache/registry"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/cmd/buildkitd/config"
"github.com/moby/buildkit/control"
"github.com/moby/buildkit/frontend"
@ -572,3 +574,19 @@ func parsePlatforms(platformsStr []string) ([]specs.Platform, error) {
}
return out, nil
}
func getGCPolicy(rules []config.GCPolicy, root string) []client.PruneInfo {
if len(rules) == 0 {
rules = config.DefaultGCPolicy(root)
}
out := make([]client.PruneInfo, 0, len(rules))
for _, rule := range rules {
out = append(out, client.PruneInfo{
Filter: rule.Filters,
All: rule.All,
KeepBytes: rule.KeepBytes,
KeepDuration: time.Duration(rule.KeepDuration) * time.Second,
})
}
return out
}

View File

@ -124,6 +124,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
return nil, err
}
opt.SessionManager = common.sessionManager
opt.GCPolicy = getGCPolicy(cfg.GCPolicy, common.config.Root)
if platformsStr := cfg.Platforms; len(platformsStr) != 0 {
platforms, err := parsePlatforms(platformsStr)

View File

@ -141,6 +141,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
return nil, err
}
opt.SessionManager = common.sessionManager
opt.GCPolicy = getGCPolicy(cfg.GCPolicy, common.config.Root)
if platformsStr := cfg.Platforms; len(platformsStr) != 0 {
platforms, err := parsePlatforms(platformsStr)

View File

@ -2,6 +2,7 @@ package control
import (
"context"
"sync"
"time"
"github.com/docker/distribution/reference"
@ -17,6 +18,7 @@ import (
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -40,6 +42,8 @@ type Controller struct { // TODO: ControlService
solver *llbsolver.Solver
cache solver.CacheManager
gatewayForwarder *controlgateway.GatewayForwarder
throttledGC func()
gcmu sync.Mutex
}
func NewController(opt Opt) (*Controller, error) {
@ -58,6 +62,12 @@ func NewController(opt Opt) (*Controller, error) {
cache: cache,
gatewayForwarder: gatewayForwarder,
}
c.throttledGC = throttle.ThrottleAfter(time.Minute, c.gc)
defer func() {
time.AfterFunc(time.Second, c.throttledGC)
}()
return c, nil
}
@ -172,6 +182,10 @@ func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Contr
func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*controlapi.SolveResponse, error) {
ctx = session.NewContext(ctx, req.Session)
defer func() {
time.AfterFunc(time.Second, c.throttledGC)
}()
var expi exporter.ExporterInstance
// TODO: multiworker
// This is actually tricky, as the exporter should come from the worker that has the returned reference. We may need to delay this so that the solver loads this.
@ -313,11 +327,55 @@ func (c *Controller) ListWorkers(ctx context.Context, r *controlapi.ListWorkersR
ID: w.ID(),
Labels: w.Labels(),
Platforms: pb.PlatformsFromSpec(w.Platforms()),
GCPolicy: toPBGCPolicy(w.GCPolicy()),
})
}
return resp, nil
}
func (c *Controller) gc() {
c.gcmu.Lock()
defer c.gcmu.Unlock()
workers, err := c.opt.WorkerController.List()
if err != nil {
return
}
eg, ctx := errgroup.WithContext(context.TODO())
var size int64
ch := make(chan client.UsageInfo)
done := make(chan struct{})
go func() {
for ui := range ch {
size += ui.Size
}
close(done)
}()
for _, w := range workers {
func(w worker.Worker) {
eg.Go(func() error {
if policy := w.GCPolicy(); len(policy) > 0 {
return w.Prune(ctx, ch, policy...)
}
return nil
})
}(w)
}
err = eg.Wait()
close(ch)
if err != nil {
logrus.Errorf("gc error: %+v", err)
}
<-done
if size > 0 {
logrus.Debugf("gc cleaned up %d bytes", size)
}
}
func parseCacheExporterOpt(opt map[string]string) solver.CacheExportMode {
for k, v := range opt {
switch k {
@ -336,3 +394,16 @@ func parseCacheExporterOpt(opt map[string]string) solver.CacheExportMode {
}
return solver.CacheExportModeMin
}
func toPBGCPolicy(in []client.PruneInfo) []*apitypes.GCPolicy {
policy := make([]*apitypes.GCPolicy, 0, len(in))
for _, p := range in {
policy = append(policy, &apitypes.GCPolicy{
All: p.All,
KeepBytes: p.KeepBytes,
KeepDuration: int64(p.KeepDuration),
Filters: p.Filter,
})
}
return policy
}

View File

@ -67,7 +67,11 @@ func (c *bridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*cli
func (c *bridgeClient) BuildOpts() client.BuildOpts {
workers := make([]client.WorkerInfo, 0, len(c.workerInfos))
for _, w := range c.workerInfos {
workers = append(workers, client.WorkerInfo(w))
workers = append(workers, client.WorkerInfo{
ID: w.ID,
Labels: w.Labels,
Platforms: w.Platforms,
})
}
return client.BuildOpts{

View File

@ -8,6 +8,17 @@ import (
// Throttle wraps a function so that internal function does not get called
// more frequently than the specified duration.
func Throttle(d time.Duration, f func()) func() {
return throttle(d, f, true)
}
// ThrottleAfter wraps a function so that internal function does not get called
// more frequently than the specified duration. The delay is added after function
// has been called.
func ThrottleAfter(d time.Duration, f func()) func() {
return throttle(d, f, false)
}
func throttle(d time.Duration, f func(), wait bool) func() {
var next, running bool
var mu sync.Mutex
return func() {
@ -25,12 +36,21 @@ func Throttle(d time.Duration, f func()) func() {
mu.Unlock()
return
}
if !wait {
next = false
}
mu.Unlock()
time.Sleep(d)
mu.Lock()
next = false
mu.Unlock()
f()
if wait {
time.Sleep(d)
mu.Lock()
next = false
mu.Unlock()
f()
} else {
f()
time.Sleep(d)
}
}
}()
}

View File

@ -54,3 +54,25 @@ func TestThrottle(t *testing.T) {
}
}
func TestThrottleAfter(t *testing.T) {
t.Parallel()
var i int64
f := func() {
atomic.AddInt64(&i, 1)
}
f = ThrottleAfter(100*time.Millisecond, f)
f()
time.Sleep(10 * time.Millisecond)
require.Equal(t, int64(1), atomic.LoadInt64(&i))
f()
time.Sleep(10 * time.Millisecond)
require.Equal(t, int64(1), atomic.LoadInt64(&i))
time.Sleep(200 * time.Millisecond)
require.Equal(t, int64(2), atomic.LoadInt64(&i))
}

View File

@ -58,6 +58,7 @@ type WorkerOpt struct {
ID string
Labels map[string]string
Platforms []specs.Platform
GCPolicy []client.PruneInfo
SessionManager *session.Manager
MetadataStore *metadata.Store
Executor executor.Executor
@ -214,6 +215,10 @@ func (w *Worker) Platforms() []specs.Platform {
return w.WorkerOpt.Platforms
}
func (w *Worker) GCPolicy() []client.PruneInfo {
return w.WorkerOpt.GCPolicy
}
func (w *Worker) LoadRef(id string) (cache.ImmutableRef, error) {
return w.CacheManager.Get(context.TODO(), id)
}
@ -258,8 +263,8 @@ func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*cl
return w.CacheManager.DiskUsage(ctx, opt)
}
func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo, opt client.PruneInfo) error {
return w.CacheManager.Prune(ctx, ch, opt)
func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo, opt ...client.PruneInfo) error {
return w.CacheManager.Prune(ctx, ch, opt...)
}
func (w *Worker) Exporter(name string) (exporter.Exporter, error) {

View File

@ -20,6 +20,7 @@ type Worker interface {
ID() string
Labels() map[string]string
Platforms() []specs.Platform
GCPolicy() []client.PruneInfo
LoadRef(id string) (cache.ImmutableRef, error)
// ResolveOp resolves Vertex.Sys() to Op implementation.
ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge) (solver.Op, error)
@ -28,7 +29,7 @@ type Worker interface {
Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error)
Exporter(name string) (exporter.Exporter, error)
Prune(ctx context.Context, ch chan client.UsageInfo, opt client.PruneInfo) error
Prune(ctx context.Context, ch chan client.UsageInfo, opt ...client.PruneInfo) error
GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error)
FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error)
}