solver: add boltdb cache store and test suite
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
cd6e788437
commit
e8af448b3a
|
@ -0,0 +1,200 @@
|
|||
package boltdbcachestorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
solver "github.com/moby/buildkit/solver-next"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
mainBucket = "_main"
|
||||
resultBucket = "_result"
|
||||
linksBucket = "_links"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
func NewStore(dbPath string) (*Store, error) {
|
||||
db, err := bolt.Open(dbPath, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to open database file %s", dbPath)
|
||||
}
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
for _, b := range []string{mainBucket, resultBucket, linksBucket} {
|
||||
if _, err := tx.CreateBucketIfNotExists([]byte(b)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Store{db: db}, nil
|
||||
}
|
||||
|
||||
func (s *Store) Get(id string) (solver.CacheKeyInfo, error) {
|
||||
var cki solver.CacheKeyInfo
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(mainBucket))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
v := b.Get([]byte(id))
|
||||
if v == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
return json.Unmarshal(v, &cki)
|
||||
})
|
||||
if err != nil {
|
||||
return solver.CacheKeyInfo{}, err
|
||||
}
|
||||
return cki, nil
|
||||
}
|
||||
|
||||
func (s *Store) Set(info solver.CacheKeyInfo) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(mainBucket))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
dt, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put([]byte(info.ID), dt)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) WalkResults(id string, fn func(solver.CacheResult) error) error {
|
||||
return s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(resultBucket))
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
b = b.Bucket([]byte(id))
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
if err := b.ForEach(func(k, v []byte) error {
|
||||
var res solver.CacheResult
|
||||
if err := json.Unmarshal(v, &res); err != nil {
|
||||
return err
|
||||
}
|
||||
return fn(res)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) Load(id string, resultID string) (solver.CacheResult, error) {
|
||||
var res solver.CacheResult
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(resultBucket))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
b = b.Bucket([]byte(id))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
|
||||
v := b.Get([]byte(resultID))
|
||||
if v == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
|
||||
return json.Unmarshal(v, &res)
|
||||
}); err != nil {
|
||||
return solver.CacheResult{}, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *Store) AddResult(id string, res solver.CacheResult) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(resultBucket))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
b, err := b.CreateBucketIfNotExists([]byte(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dt, err := json.Marshal(res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put([]byte(res.ID), dt)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) Release(resultID string) error {
|
||||
return errors.Errorf("not-implemented")
|
||||
}
|
||||
|
||||
func (s *Store) AddLink(id string, link solver.CacheInfoLink, target string) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(linksBucket))
|
||||
if b == nil {
|
||||
return errors.WithStack(solver.ErrNotFound)
|
||||
}
|
||||
b, err := b.CreateBucketIfNotExists([]byte(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dt, err := json.Marshal(link)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.Put(bytes.Join([][]byte{dt, []byte(target)}, []byte("@")), []byte{})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) WalkLinks(id string, link solver.CacheInfoLink, fn func(id string) error) error {
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte(linksBucket))
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
b = b.Bucket([]byte(id))
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
dt, err := json.Marshal(link)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
index := bytes.Join([][]byte{dt, {}}, []byte("@"))
|
||||
c := b.Cursor()
|
||||
k, _ := c.Seek([]byte(index))
|
||||
for {
|
||||
if k != nil && bytes.HasPrefix(k, index) {
|
||||
target := bytes.TrimPrefix(k, index)
|
||||
|
||||
if err := fn(string(target)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
k, _ = c.Next()
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package boltdbcachestorage
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
solver "github.com/moby/buildkit/solver-next"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBoltCacheStorage(t *testing.T) {
|
||||
solver.RunCacheStorageTests(t, func() (solver.CacheKeyStorage, func()) {
|
||||
tmpDir, err := ioutil.TempDir("", "storage")
|
||||
require.NoError(t, err)
|
||||
|
||||
cleanup := func() {
|
||||
os.RemoveAll(tmpDir)
|
||||
}
|
||||
|
||||
st, err := NewStore(filepath.Join(tmpDir, "cache.db"))
|
||||
if err != nil {
|
||||
cleanup()
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
return st, cleanup
|
||||
})
|
||||
}
|
|
@ -364,7 +364,7 @@ func (c *inMemoryCacheManager) getInternalKey(k CacheKey, createIfNotExist bool)
|
|||
Deps: inputs,
|
||||
}
|
||||
|
||||
if err := c.backend.Set(cki.ID, cki); err != nil {
|
||||
if err := c.backend.Set(cki); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ var ErrNotFound = errors.Errorf("not found")
|
|||
// CacheKeyStorage is interface for persisting cache metadata
|
||||
type CacheKeyStorage interface {
|
||||
Get(id string) (CacheKeyInfo, error)
|
||||
Set(id string, info CacheKeyInfo) error
|
||||
Set(info CacheKeyInfo) error
|
||||
|
||||
WalkResults(id string, fn func(CacheResult) error) error
|
||||
Load(id string, resultID string) (CacheResult, error)
|
||||
|
@ -47,9 +47,10 @@ type CacheResult struct {
|
|||
|
||||
// CacheInfoLink is a link between two cache keys
|
||||
type CacheInfoLink struct {
|
||||
Input, Output Index
|
||||
Digest digest.Digest
|
||||
Selector digest.Digest
|
||||
Input Index `json:"Input,omitempty"`
|
||||
Output Index `json:"Output,omitempty"`
|
||||
Digest digest.Digest `json:"Digest,omitempty"`
|
||||
Selector digest.Digest `json:"Selector,omitempty"`
|
||||
}
|
||||
|
||||
// CacheResultStorage is interface for converting cache metadata result to
|
||||
|
|
|
@ -0,0 +1,212 @@
|
|||
package solver
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func RunCacheStorageTests(t *testing.T, st func() (CacheKeyStorage, func())) {
|
||||
for _, tc := range []func(*testing.T, CacheKeyStorage){
|
||||
testGetSet,
|
||||
testResults,
|
||||
testLinks,
|
||||
} {
|
||||
runStorageTest(t, tc, st)
|
||||
}
|
||||
}
|
||||
|
||||
func runStorageTest(t *testing.T, fn func(t *testing.T, st CacheKeyStorage), st func() (CacheKeyStorage, func())) {
|
||||
require.True(t, t.Run(getFunctionName(fn), func(t *testing.T) {
|
||||
s, cleanup := st()
|
||||
defer cleanup()
|
||||
fn(t, s)
|
||||
}))
|
||||
}
|
||||
|
||||
func testGetSet(t *testing.T, st CacheKeyStorage) {
|
||||
t.Parallel()
|
||||
cki := CacheKeyInfo{
|
||||
ID: "foo",
|
||||
Base: digest.FromBytes([]byte("foo")),
|
||||
}
|
||||
err := st.Set(cki)
|
||||
require.NoError(t, err)
|
||||
|
||||
cki2, err := st.Get(cki.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cki, cki2)
|
||||
|
||||
_, err = st.Get("bar")
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errors.Cause(err), ErrNotFound)
|
||||
}
|
||||
|
||||
func testResults(t *testing.T, st CacheKeyStorage) {
|
||||
t.Parallel()
|
||||
cki := CacheKeyInfo{
|
||||
ID: "foo",
|
||||
Base: digest.FromBytes([]byte("foo")),
|
||||
}
|
||||
err := st.Set(cki)
|
||||
require.NoError(t, err)
|
||||
|
||||
cki2 := CacheKeyInfo{
|
||||
ID: "bar",
|
||||
Base: digest.FromBytes([]byte("bar")),
|
||||
}
|
||||
err = st.Set(cki2)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddResult(cki.ID, CacheResult{
|
||||
ID: "foo0",
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddResult(cki.ID, CacheResult{
|
||||
ID: "foo1",
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddResult(cki2.ID, CacheResult{
|
||||
ID: "bar0",
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
m := map[string]CacheResult{}
|
||||
err = st.WalkResults("foo", func(r CacheResult) error {
|
||||
m[r.ID] = r
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(m), 2)
|
||||
f0, ok := m["foo0"]
|
||||
require.True(t, ok)
|
||||
f1, ok := m["foo1"]
|
||||
require.True(t, ok)
|
||||
require.True(t, f0.CreatedAt.Before(f1.CreatedAt))
|
||||
|
||||
m = map[string]CacheResult{}
|
||||
err = st.WalkResults("bar", func(r CacheResult) error {
|
||||
m[r.ID] = r
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(m), 1)
|
||||
_, ok = m["bar0"]
|
||||
require.True(t, ok)
|
||||
|
||||
// empty result
|
||||
err = st.WalkResults("baz", func(r CacheResult) error {
|
||||
require.Fail(t, "unreachable")
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := st.Load("foo", "foo1")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, res.ID, "foo1")
|
||||
|
||||
_, err = st.Load("foo1", "foo1")
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errors.Cause(err), ErrNotFound)
|
||||
|
||||
_, err = st.Load("foo", "foo2")
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errors.Cause(err), ErrNotFound)
|
||||
}
|
||||
|
||||
func testLinks(t *testing.T, st CacheKeyStorage) {
|
||||
t.Parallel()
|
||||
cki := CacheKeyInfo{
|
||||
ID: "foo",
|
||||
Base: digest.FromBytes([]byte("foo")),
|
||||
}
|
||||
err := st.Set(cki)
|
||||
require.NoError(t, err)
|
||||
|
||||
cki2 := CacheKeyInfo{
|
||||
ID: "bar",
|
||||
Base: digest.FromBytes([]byte("bar")),
|
||||
}
|
||||
err = st.Set(cki2)
|
||||
require.NoError(t, err)
|
||||
|
||||
l0 := CacheInfoLink{
|
||||
Input: 0, Output: 1, Digest: digest.FromBytes([]byte(">target0")),
|
||||
}
|
||||
err = st.AddLink(cki.ID, l0, "target0")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.AddLink(cki2.ID, l0, "target0-bar")
|
||||
require.NoError(t, err)
|
||||
|
||||
m := map[string]struct{}{}
|
||||
err = st.WalkLinks(cki.ID, l0, func(id string) error {
|
||||
m[id] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(m), 1)
|
||||
_, ok := m["target0"]
|
||||
require.True(t, ok)
|
||||
|
||||
l1 := CacheInfoLink{
|
||||
Input: 0, Output: 1, Digest: digest.FromBytes([]byte(">target1")),
|
||||
}
|
||||
m = map[string]struct{}{}
|
||||
err = st.WalkLinks(cki.ID, l1, func(id string) error {
|
||||
m[id] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(m), 0)
|
||||
|
||||
err = st.AddLink(cki.ID, l1, "target1")
|
||||
require.NoError(t, err)
|
||||
|
||||
m = map[string]struct{}{}
|
||||
err = st.WalkLinks(cki.ID, l1, func(id string) error {
|
||||
m[id] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(m), 1)
|
||||
|
||||
_, ok = m["target1"]
|
||||
require.True(t, ok)
|
||||
|
||||
err = st.AddLink(cki.ID, l1, "target1-second")
|
||||
require.NoError(t, err)
|
||||
|
||||
m = map[string]struct{}{}
|
||||
err = st.WalkLinks(cki.ID, l1, func(id string) error {
|
||||
m[id] = struct{}{}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(m), 2)
|
||||
_, ok = m["target1"]
|
||||
require.True(t, ok)
|
||||
_, ok = m["target1-second"]
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func getFunctionName(i interface{}) string {
|
||||
fullname := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
|
||||
dot := strings.LastIndex(fullname, ".") + 1
|
||||
return strings.Title(fullname[dot:])
|
||||
}
|
|
@ -34,16 +34,16 @@ func (s *inMemoryStore) Get(id string) (CacheKeyInfo, error) {
|
|||
return k.CacheKeyInfo, nil
|
||||
}
|
||||
|
||||
func (s *inMemoryStore) Set(id string, info CacheKeyInfo) error {
|
||||
func (s *inMemoryStore) Set(info CacheKeyInfo) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
k, ok := s.byID[id]
|
||||
k, ok := s.byID[info.ID]
|
||||
if !ok {
|
||||
k = &inMemoryKey{
|
||||
results: map[string]CacheResult{},
|
||||
links: map[CacheInfoLink]map[string]struct{}{},
|
||||
}
|
||||
s.byID[id] = k
|
||||
s.byID[info.ID] = k
|
||||
}
|
||||
k.CacheKeyInfo = info
|
||||
return nil
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
package solver
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestMemoryCacheStorage(t *testing.T) {
|
||||
RunCacheStorageTests(t, func() (CacheKeyStorage, func()) {
|
||||
return NewInMemoryCacheStorage(), func() {}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue