Merge pull request #2652 from ktock/sharemounts-cleanup
cache: Clean up temporary mount pool on restartmaster
commit
f5a831c550
|
@ -1223,6 +1223,7 @@ func setupCacheManager(t *testing.T, tmpdir string, snapshotterName string, snap
|
||||||
Applier: applier,
|
Applier: applier,
|
||||||
Differ: differ,
|
Differ: differ,
|
||||||
GarbageCollect: mdb.GarbageCollect,
|
GarbageCollect: mdb.GarbageCollect,
|
||||||
|
MountPoolRoot: filepath.Join(tmpdir, "cachemounts"),
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,7 @@ type ManagerOpt struct {
|
||||||
Applier diff.Applier
|
Applier diff.Applier
|
||||||
Differ diff.Comparer
|
Differ diff.Comparer
|
||||||
MetadataStore *metadata.Store
|
MetadataStore *metadata.Store
|
||||||
|
MountPoolRoot string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Accessor interface {
|
type Accessor interface {
|
||||||
|
@ -90,6 +91,8 @@ type cacheManager struct {
|
||||||
Differ diff.Comparer
|
Differ diff.Comparer
|
||||||
MetadataStore *metadata.Store
|
MetadataStore *metadata.Store
|
||||||
|
|
||||||
|
mountPool sharableMountPool
|
||||||
|
|
||||||
muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
|
muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
|
||||||
unlazyG flightcontrol.Group
|
unlazyG flightcontrol.Group
|
||||||
}
|
}
|
||||||
|
@ -111,6 +114,12 @@ func NewManager(opt ManagerOpt) (Manager, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p, err := newSharableMountPool(opt.MountPoolRoot)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cm.mountPool = p
|
||||||
|
|
||||||
// cm.scheduleGC(5 * time.Minute)
|
// cm.scheduleGC(5 * time.Minute)
|
||||||
|
|
||||||
return cm, nil
|
return cm, nil
|
||||||
|
|
|
@ -151,6 +151,7 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func()
|
||||||
GarbageCollect: mdb.GarbageCollect,
|
GarbageCollect: mdb.GarbageCollect,
|
||||||
Applier: applier,
|
Applier: applier,
|
||||||
Differ: differ,
|
Differ: differ,
|
||||||
|
MountPoolRoot: filepath.Join(tmpdir, "cachemounts"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -162,6 +163,32 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func()
|
||||||
}, cleanup, nil
|
}, cleanup, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSharableMountPoolCleanup(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
|
||||||
|
|
||||||
|
tmpdir, err := ioutil.TempDir("", "cachemanager")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer os.RemoveAll(tmpdir)
|
||||||
|
|
||||||
|
// Emulate the situation where the pool dir is dirty
|
||||||
|
mountPoolDir := filepath.Join(tmpdir, "cachemounts")
|
||||||
|
require.NoError(t, os.MkdirAll(mountPoolDir, 0700))
|
||||||
|
_, err = ioutil.TempDir(mountPoolDir, "buildkit")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Initialize cache manager and check if pool is cleaned up
|
||||||
|
_, cleanup, err := newCacheManager(ctx, cmOpt{
|
||||||
|
tmpdir: tmpdir,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
files, err := os.ReadDir(mountPoolDir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(files))
|
||||||
|
}
|
||||||
|
|
||||||
func TestManager(t *testing.T) {
|
func TestManager(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -22,11 +23,13 @@ import (
|
||||||
"github.com/moby/buildkit/session"
|
"github.com/moby/buildkit/session"
|
||||||
"github.com/moby/buildkit/snapshot"
|
"github.com/moby/buildkit/snapshot"
|
||||||
"github.com/moby/buildkit/solver"
|
"github.com/moby/buildkit/solver"
|
||||||
|
"github.com/moby/buildkit/util/bklog"
|
||||||
"github.com/moby/buildkit/util/compression"
|
"github.com/moby/buildkit/util/compression"
|
||||||
"github.com/moby/buildkit/util/flightcontrol"
|
"github.com/moby/buildkit/util/flightcontrol"
|
||||||
"github.com/moby/buildkit/util/leaseutil"
|
"github.com/moby/buildkit/util/leaseutil"
|
||||||
"github.com/moby/buildkit/util/progress"
|
"github.com/moby/buildkit/util/progress"
|
||||||
"github.com/moby/buildkit/util/winlayers"
|
"github.com/moby/buildkit/util/winlayers"
|
||||||
|
"github.com/moby/sys/mountinfo"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -1422,7 +1425,7 @@ func (sr *mutableRef) Mount(ctx context.Context, readonly bool, s session.Group)
|
||||||
|
|
||||||
// Make the mounts sharable. We don't do this for immutableRef mounts because
|
// Make the mounts sharable. We don't do this for immutableRef mounts because
|
||||||
// it requires the raw []mount.Mount for computing diff on overlayfs.
|
// it requires the raw []mount.Mount for computing diff on overlayfs.
|
||||||
mnt = setSharable(mnt)
|
mnt = sr.cm.mountPool.setSharable(mnt)
|
||||||
sr.mountCache = mnt
|
sr.mountCache = mnt
|
||||||
if readonly {
|
if readonly {
|
||||||
mnt = setReadonly(mnt)
|
mnt = setReadonly(mnt)
|
||||||
|
@ -1524,8 +1527,42 @@ func readonlyOverlay(opt []string) []string {
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func setSharable(mounts snapshot.Mountable) snapshot.Mountable {
|
func newSharableMountPool(tmpdirRoot string) (sharableMountPool, error) {
|
||||||
return &sharableMountable{Mountable: mounts}
|
if tmpdirRoot != "" {
|
||||||
|
if err := os.MkdirAll(tmpdirRoot, 0700); err != nil {
|
||||||
|
return sharableMountPool{}, fmt.Errorf("failed to prepare mount pool: %w", err)
|
||||||
|
}
|
||||||
|
// If tmpdirRoot is specified, remove existing mounts to avoid conflict.
|
||||||
|
files, err := os.ReadDir(tmpdirRoot)
|
||||||
|
if err != nil {
|
||||||
|
return sharableMountPool{}, fmt.Errorf("failed to read mount pool: %w", err)
|
||||||
|
}
|
||||||
|
for _, file := range files {
|
||||||
|
if file.IsDir() {
|
||||||
|
dir := filepath.Join(tmpdirRoot, file.Name())
|
||||||
|
bklog.G(context.Background()).Debugf("cleaning up existing temporary mount %q", dir)
|
||||||
|
if err := mount.Unmount(dir, 0); err != nil {
|
||||||
|
if mounted, merr := mountinfo.Mounted(dir); merr != nil || mounted {
|
||||||
|
bklog.G(context.Background()).WithError(err).WithError(merr).
|
||||||
|
WithField("mounted", mounted).Warnf("failed to unmount existing temporary mount %q", dir)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := os.Remove(dir); err != nil {
|
||||||
|
bklog.G(context.Background()).WithError(err).Warnf("failed to remove existing temporary mount %q", dir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sharableMountPool{tmpdirRoot}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type sharableMountPool struct {
|
||||||
|
tmpdirRoot string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p sharableMountPool) setSharable(mounts snapshot.Mountable) snapshot.Mountable {
|
||||||
|
return &sharableMountable{Mountable: mounts, mountPoolRoot: p.tmpdirRoot}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sharableMountable allows sharing underlying (possibly writable) mounts among callers.
|
// sharableMountable allows sharing underlying (possibly writable) mounts among callers.
|
||||||
|
@ -1538,8 +1575,9 @@ func setSharable(mounts snapshot.Mountable) snapshot.Mountable {
|
||||||
type sharableMountable struct {
|
type sharableMountable struct {
|
||||||
snapshot.Mountable
|
snapshot.Mountable
|
||||||
|
|
||||||
count int32
|
count int32
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
mountPoolRoot string
|
||||||
|
|
||||||
curMounts []mount.Mount
|
curMounts []mount.Mount
|
||||||
curMountPoint string
|
curMountPoint string
|
||||||
|
@ -1571,7 +1609,7 @@ func (sm *sharableMountable) Mount() (_ []mount.Mount, _ func() error, retErr er
|
||||||
// Don't need temporary mount wrapper for non-overlayfs mounts
|
// Don't need temporary mount wrapper for non-overlayfs mounts
|
||||||
return mounts, release, nil
|
return mounts, release, nil
|
||||||
}
|
}
|
||||||
dir, err := ioutil.TempDir("", "buildkit")
|
dir, err := ioutil.TempDir(sm.mountPoolRoot, "buildkit")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -35,6 +35,7 @@ require (
|
||||||
github.com/klauspost/compress v1.14.3
|
github.com/klauspost/compress v1.14.3
|
||||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
github.com/mitchellh/hashstructure/v2 v2.0.2
|
||||||
github.com/moby/locker v1.0.1
|
github.com/moby/locker v1.0.1
|
||||||
|
github.com/moby/sys/mountinfo v0.6.0
|
||||||
github.com/moby/sys/signal v0.6.0
|
github.com/moby/sys/signal v0.6.0
|
||||||
github.com/morikuni/aec v1.0.0
|
github.com/morikuni/aec v1.0.0
|
||||||
github.com/opencontainers/go-digest v1.0.0
|
github.com/opencontainers/go-digest v1.0.0
|
||||||
|
@ -104,7 +105,6 @@ require (
|
||||||
github.com/ishidawataru/sctp v0.0.0-20210226210310-f2269e66cdee // indirect
|
github.com/ishidawataru/sctp v0.0.0-20210226210310-f2269e66cdee // indirect
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
|
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
|
||||||
github.com/moby/sys/mount v0.3.0 // indirect
|
github.com/moby/sys/mount v0.3.0 // indirect
|
||||||
github.com/moby/sys/mountinfo v0.6.0 // indirect
|
|
||||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
|
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/prometheus/client_golang v1.12.1 // indirect
|
github.com/prometheus/client_golang v1.12.1 // indirect
|
||||||
|
|
|
@ -128,6 +128,7 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func()
|
||||||
Differ: differ,
|
Differ: differ,
|
||||||
LeaseManager: lm,
|
LeaseManager: lm,
|
||||||
GarbageCollect: mdb.GarbageCollect,
|
GarbageCollect: mdb.GarbageCollect,
|
||||||
|
MountPoolRoot: filepath.Join(tmpdir, "cachemounts"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
|
|
@ -571,6 +571,7 @@ func setupGitSource(t *testing.T, tmpdir string) source.Source {
|
||||||
Applier: applier,
|
Applier: applier,
|
||||||
Differ: differ,
|
Differ: differ,
|
||||||
GarbageCollect: mdb.GarbageCollect,
|
GarbageCollect: mdb.GarbageCollect,
|
||||||
|
MountPoolRoot: filepath.Join(tmpdir, "cachemounts"),
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -373,6 +373,7 @@ func newHTTPSource(tmpdir string) (source.Source, error) {
|
||||||
Applier: applier,
|
Applier: applier,
|
||||||
Differ: differ,
|
Differ: differ,
|
||||||
GarbageCollect: mdb.GarbageCollect,
|
GarbageCollect: mdb.GarbageCollect,
|
||||||
|
MountPoolRoot: filepath.Join(tmpdir, "cachemounts"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -75,6 +75,7 @@ type WorkerOpt struct {
|
||||||
GarbageCollect func(context.Context) (gc.Stats, error)
|
GarbageCollect func(context.Context) (gc.Stats, error)
|
||||||
ParallelismSem *semaphore.Weighted
|
ParallelismSem *semaphore.Weighted
|
||||||
MetadataStore *metadata.Store
|
MetadataStore *metadata.Store
|
||||||
|
MountPoolRoot string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
|
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
|
||||||
|
@ -103,6 +104,7 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) {
|
||||||
ContentStore: opt.ContentStore,
|
ContentStore: opt.ContentStore,
|
||||||
Differ: opt.Differ,
|
Differ: opt.Differ,
|
||||||
MetadataStore: opt.MetadataStore,
|
MetadataStore: opt.MetadataStore,
|
||||||
|
MountPoolRoot: opt.MountPoolRoot,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -132,6 +132,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
|
||||||
LeaseManager: lm,
|
LeaseManager: lm,
|
||||||
GarbageCollect: gc,
|
GarbageCollect: gc,
|
||||||
ParallelismSem: parallelismSem,
|
ParallelismSem: parallelismSem,
|
||||||
|
MountPoolRoot: filepath.Join(root, "cachemounts"),
|
||||||
}
|
}
|
||||||
return opt, nil
|
return opt, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,6 +135,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc
|
||||||
LeaseManager: lm,
|
LeaseManager: lm,
|
||||||
GarbageCollect: mdb.GarbageCollect,
|
GarbageCollect: mdb.GarbageCollect,
|
||||||
ParallelismSem: parallelismSem,
|
ParallelismSem: parallelismSem,
|
||||||
|
MountPoolRoot: filepath.Join(root, "cachemounts"),
|
||||||
}
|
}
|
||||||
return opt, nil
|
return opt, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue