make blobmapping use metadata package
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
336cfe07fa
commit
44415841c9
|
@ -2,8 +2,6 @@ package cache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
|
@ -16,8 +14,6 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const dbFile = "cache.db"
|
||||
|
||||
var (
|
||||
errLocked = errors.New("locked")
|
||||
errNotFound = errors.New("not found")
|
||||
|
@ -26,8 +22,8 @@ var (
|
|||
|
||||
type ManagerOpt struct {
|
||||
Snapshotter snapshot.Snapshotter
|
||||
Root string
|
||||
GCPolicy GCPolicy
|
||||
MetadataStore *metadata.Store
|
||||
}
|
||||
|
||||
type Accessor interface {
|
||||
|
@ -56,18 +52,9 @@ type cacheManager struct {
|
|||
}
|
||||
|
||||
func NewManager(opt ManagerOpt) (Manager, error) {
|
||||
if err := os.MkdirAll(opt.Root, 0700); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create %s", opt.Root)
|
||||
}
|
||||
|
||||
md, err := metadata.NewStore(filepath.Join(opt.Root, dbFile))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cm := &cacheManager{
|
||||
ManagerOpt: opt,
|
||||
md: md,
|
||||
md: opt.MetadataStore,
|
||||
records: make(map[string]*cacheRecord),
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/snapshot/naive"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -24,9 +25,12 @@ func TestManager(t *testing.T) {
|
|||
snapshotter, err := naive.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
cm, err := NewManager(ManagerOpt{
|
||||
Root: tmpdir,
|
||||
Snapshotter: snapshotter,
|
||||
MetadataStore: md,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ func TestGetSetSearch(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer s.Close()
|
||||
|
||||
si, ok := s.GetItem("foo")
|
||||
si, ok := s.Get("foo")
|
||||
require.False(t, ok)
|
||||
|
||||
v := si.Get("bar")
|
||||
|
@ -52,7 +52,7 @@ func TestGetSetSearch(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer s.Close()
|
||||
|
||||
si, ok = s.GetItem("foo")
|
||||
si, ok = s.Get("foo")
|
||||
require.True(t, ok)
|
||||
|
||||
v = si.Get("bar")
|
||||
|
@ -65,7 +65,7 @@ func TestGetSetSearch(t *testing.T) {
|
|||
|
||||
// add second item to test Search
|
||||
|
||||
si, ok = s.GetItem("foo2")
|
||||
si, ok = s.Get("foo2")
|
||||
require.False(t, ok)
|
||||
|
||||
v, err = NewValue("foobar2")
|
||||
|
@ -103,7 +103,7 @@ func TestGetSetSearch(t *testing.T) {
|
|||
|
||||
require.Equal(t, "foo2", sis[0].ID())
|
||||
|
||||
_, ok = s.GetItem("foo")
|
||||
_, ok = s.Get("foo")
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
|
@ -127,7 +127,7 @@ func TestIndexes(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tcase := range tcases {
|
||||
si, ok := s.GetItem(tcase.key)
|
||||
si, ok := s.Get(tcase.key)
|
||||
require.False(t, ok)
|
||||
|
||||
v, err := NewValue(tcase.valueKey)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/containerd/containerd/rootfs"
|
||||
ctdsnapshot "github.com/containerd/containerd/snapshot"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/snapshot/blobmapping"
|
||||
"github.com/moby/buildkit/source"
|
||||
"github.com/moby/buildkit/source/containerimage"
|
||||
|
@ -21,10 +22,15 @@ type pullDeps struct {
|
|||
}
|
||||
|
||||
func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) {
|
||||
md, err := metadata.NewStore(filepath.Join(root, "metadata.db"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
snapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{
|
||||
Root: filepath.Join(root, "blobmap"),
|
||||
Content: pd.ContentStore,
|
||||
Snapshotter: pd.Snapshotter,
|
||||
MetadataStore: md,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -32,7 +38,7 @@ func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) {
|
|||
|
||||
cm, err := cache.NewManager(cache.ManagerOpt{
|
||||
Snapshotter: snapshotter,
|
||||
Root: filepath.Join(root, "cachemanager"),
|
||||
MetadataStore: md,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/moby/buildkit/cache"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
"github.com/moby/buildkit/snapshot"
|
||||
"github.com/moby/buildkit/snapshot/blobmapping"
|
||||
"github.com/moby/buildkit/source"
|
||||
|
@ -32,16 +33,19 @@ func TestControl(t *testing.T) {
|
|||
cd, err := newStandalonePullDeps(tmpdir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
snapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{
|
||||
Root: filepath.Join(tmpdir, "blobmap"),
|
||||
Content: cd.ContentStore,
|
||||
Snapshotter: cd.Snapshotter,
|
||||
MetadataStore: md,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
cm, err := cache.NewManager(cache.ManagerOpt{
|
||||
Snapshotter: snapshotter,
|
||||
Root: filepath.Join(tmpdir, "cachemanager"),
|
||||
MetadataStore: md,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
|
|
@ -5,4 +5,4 @@ set -eu -o pipefail -x
|
|||
# update this to iidfile after 17.06
|
||||
docker build -t buildkit:test --target unit-tests -f ./hack/dockerfiles/test.Dockerfile --force-rm .
|
||||
docker run --rm -v /tmp --privileged buildkit:test go test ${TESTFLAGS:--v} ${TESTPKGS:-./...}
|
||||
|
||||
docker run --rm -v /tmp --privileged buildkit:test go test -tags standalone -v ./control
|
||||
|
|
|
@ -1,29 +1,22 @@
|
|||
package blobmapping
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
"github.com/moby/buildkit/cache/metadata"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const dbFile = "blobmap.db"
|
||||
|
||||
var (
|
||||
bucketBySnapshot = []byte("by_snapshot")
|
||||
bucketByBlob = []byte("by_blob")
|
||||
)
|
||||
const blobKey = "blobmapping.blob"
|
||||
|
||||
type Opt struct {
|
||||
Content content.Store
|
||||
Snapshotter snapshot.Snapshotter
|
||||
Root string
|
||||
MetadataStore *metadata.Store
|
||||
}
|
||||
|
||||
type Info struct {
|
||||
|
@ -35,36 +28,18 @@ type Info struct {
|
|||
|
||||
type Snapshotter struct {
|
||||
snapshot.Snapshotter
|
||||
db *bolt.DB
|
||||
opt Opt
|
||||
}
|
||||
|
||||
func NewSnapshotter(opt Opt) (*Snapshotter, error) {
|
||||
if err := os.MkdirAll(opt.Root, 0700); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create %s", opt.Root)
|
||||
}
|
||||
|
||||
p := filepath.Join(opt.Root, dbFile)
|
||||
db, err := bolt.Open(p, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to open database file %s", p)
|
||||
}
|
||||
|
||||
s := &Snapshotter{
|
||||
Snapshotter: opt.Snapshotter,
|
||||
db: db,
|
||||
opt: opt,
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Snapshotter) init() error {
|
||||
// this should do a walk from the DB and remove any records that are not
|
||||
// in snapshotter any more
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove also removes a refrence to a blob. If it is a last reference then it deletes it the blob as well
|
||||
// Remove is not safe to be called concurrently
|
||||
func (s *Snapshotter) Remove(ctx context.Context, key string) error {
|
||||
|
@ -73,26 +48,21 @@ func (s *Snapshotter) Remove(ctx context.Context, key string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
blobs, err := s.opt.MetadataStore.Search(index(blob))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Snapshotter.Remove(ctx, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketBySnapshot)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
b.Delete([]byte(key))
|
||||
|
||||
if blob != "" {
|
||||
b = tx.Bucket(bucketByBlob)
|
||||
b.Delete(blobKey(blob, key))
|
||||
if len(keyRange(tx, blobKey(blob, ""))) == 0 { // last snapshot
|
||||
s.opt.Content.Delete(ctx, blob) // log error
|
||||
if len(blobs) == 1 && blobs[0].ID() == key { // last snapshot
|
||||
if err := s.opt.Content.Delete(ctx, blob); err != nil {
|
||||
logrus.Errorf("failed to delete blob %v", blob)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshot.Usage, error) {
|
||||
|
@ -114,23 +84,17 @@ func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshot.Usage, er
|
|||
return u, nil
|
||||
}
|
||||
|
||||
// TODO: make Blob/SetBlob part of generic metadata wrapper that can detect
|
||||
// blob key for deletion logic
|
||||
|
||||
func (s *Snapshotter) GetBlob(ctx context.Context, key string) (digest.Digest, error) {
|
||||
md, _ := s.opt.MetadataStore.Get(key)
|
||||
v := md.Get(blobKey)
|
||||
if v == nil {
|
||||
return "", nil
|
||||
}
|
||||
var blob digest.Digest
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketBySnapshot)
|
||||
if b == nil {
|
||||
return nil
|
||||
if err := v.Unmarshal(&blob); err != nil {
|
||||
return "", err
|
||||
}
|
||||
v := b.Get([]byte(key))
|
||||
if v != nil {
|
||||
blob = digest.Digest(v)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return blob, err
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
// Validates that there is no blob associated with the snapshot.
|
||||
|
@ -141,47 +105,19 @@ func (s *Snapshotter) SetBlob(ctx context.Context, key string, blob digest.Diges
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(bucketBySnapshot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v := b.Get([]byte(key))
|
||||
if v != nil {
|
||||
if string(v) != string(blob) {
|
||||
return errors.Errorf("different blob already set for %s", key)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
md, _ := s.opt.MetadataStore.Get(key)
|
||||
|
||||
if err := b.Put([]byte(key), []byte(blob)); err != nil {
|
||||
return err
|
||||
}
|
||||
b, err = tx.CreateBucketIfNotExists(bucketByBlob)
|
||||
v, err := metadata.NewValue(blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put(blobKey(blob, key), []byte{})
|
||||
v.Index = index(blob)
|
||||
|
||||
return md.Update(func(b *bolt.Bucket) error {
|
||||
return md.SetValue(b, blobKey, *v)
|
||||
})
|
||||
}
|
||||
|
||||
func blobKey(blob digest.Digest, snapshot string) []byte {
|
||||
return []byte(string(blob) + "-" + snapshot)
|
||||
}
|
||||
|
||||
// results are only valid for the lifetime of the transaction
|
||||
func keyRange(tx *bolt.Tx, key []byte) (out [][]byte) {
|
||||
c := tx.Cursor()
|
||||
lastKey := append([]byte{}, key...)
|
||||
lastKey = append(lastKey, ^byte(0))
|
||||
k, _ := c.Seek([]byte(key))
|
||||
for {
|
||||
if k != nil && bytes.Compare(k, lastKey) <= 0 {
|
||||
out = append(out, k)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return
|
||||
func index(blob digest.Digest) string {
|
||||
return "blobmap::" + blob.String()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue