Renames and refactoring
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>docker-18.09
parent
35e661faf3
commit
ae05d02379
|
@ -0,0 +1,31 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GCPolicy defines policy for garbage collection
|
||||
type GCPolicy struct {
|
||||
MaxSize uint64
|
||||
MaxKeepDuration time.Duration
|
||||
}
|
||||
|
||||
// // CachePolicy defines policy for keeping a resource in cache
|
||||
// type CachePolicy struct {
|
||||
// Priority int
|
||||
// LastUsed time.Time
|
||||
// }
|
||||
//
|
||||
// func defaultCachePolicy() CachePolicy {
|
||||
// return CachePolicy{Priority: 10, LastUsed: time.Now()}
|
||||
// }
|
||||
|
||||
func (cm *cacheManager) Prune(ctx context.Context) (map[string]int64, error) {
|
||||
return nil, errors.New("Prune not implemented")
|
||||
}
|
||||
|
||||
func (cm *cacheManager) GC(ctx context.Context) error {
|
||||
return errors.New("GC not implemented")
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tonistiigi/buildkit_poc/snapshot"
|
||||
)
|
||||
|
||||
const dbFile = "cache.db"
|
||||
|
||||
var (
|
||||
errLocked = errors.New("locked")
|
||||
errNotFound = errors.New("not found")
|
||||
errInvalid = errors.New("invalid")
|
||||
)
|
||||
|
||||
type ManagerOpt struct {
|
||||
Snapshotter snapshot.Snapshotter
|
||||
Root string
|
||||
GCPolicy GCPolicy
|
||||
}
|
||||
|
||||
type Accessor interface {
|
||||
Get(id string) (ImmutableRef, error)
|
||||
New(s ImmutableRef) (MutableRef, error)
|
||||
GetMutable(id string) (MutableRef, error) // Rebase?
|
||||
}
|
||||
|
||||
type Controller interface {
|
||||
DiskUsage(ctx context.Context) ([]UsageInfo, error)
|
||||
Prune(ctx context.Context) (map[string]int64, error)
|
||||
GC(ctx context.Context) error
|
||||
}
|
||||
|
||||
type Manager interface {
|
||||
Accessor
|
||||
Controller
|
||||
Close() error
|
||||
}
|
||||
|
||||
type UsageInfo struct {
|
||||
ID string
|
||||
Active bool
|
||||
InUse bool
|
||||
Size int64
|
||||
// Meta string
|
||||
// LastUsed time.Time
|
||||
}
|
||||
|
||||
type cacheManager struct {
|
||||
db *bolt.DB // note: no particual reason for bolt
|
||||
records map[string]*cacheRecord
|
||||
mu sync.Mutex
|
||||
ManagerOpt
|
||||
}
|
||||
|
||||
func NewManager(opt ManagerOpt) (Manager, error) {
|
||||
if err := os.MkdirAll(opt.Root, 0700); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create %s", opt.Root)
|
||||
}
|
||||
|
||||
p := filepath.Join(opt.Root, dbFile)
|
||||
db, err := bolt.Open(p, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to open database file %s", p)
|
||||
}
|
||||
|
||||
cm := &cacheManager{
|
||||
ManagerOpt: opt,
|
||||
db: db,
|
||||
records: make(map[string]*cacheRecord),
|
||||
}
|
||||
|
||||
if err := cm.init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// cm.scheduleGC(5 * time.Minute)
|
||||
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) init() error {
|
||||
// load all refs from db
|
||||
// compare with the walk from Snapshotter
|
||||
// delete items that are not in db (or implement broken transaction detection)
|
||||
// keep all refs in memory(maybe in future work on disk only or with lru)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) Close() error {
|
||||
// TODO: allocate internal context and cancel it here
|
||||
return cm.db.Close()
|
||||
}
|
||||
|
||||
func (cm *cacheManager) Get(id string) (ImmutableRef, error) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
rec, ok := cm.records[id]
|
||||
if !ok {
|
||||
// TODO: lazy-load from Snapshotter
|
||||
return nil, errors.Wrapf(errNotFound, "%s not found", id)
|
||||
}
|
||||
|
||||
rec.mu.Lock()
|
||||
defer rec.mu.Unlock()
|
||||
|
||||
if rec.mutable && !rec.frozen {
|
||||
if len(rec.refs) != 0 {
|
||||
return nil, errors.Wrapf(errLocked, "%s is locked", id)
|
||||
} else {
|
||||
rec.frozen = true
|
||||
}
|
||||
}
|
||||
|
||||
return rec.ref(), nil
|
||||
}
|
||||
func (cm *cacheManager) New(s ImmutableRef) (MutableRef, error) {
|
||||
id := generateID()
|
||||
|
||||
var parent ImmutableRef
|
||||
var parentID string
|
||||
if s != nil {
|
||||
var err error
|
||||
parent, err = cm.Get(s.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parentID = parent.ID()
|
||||
}
|
||||
|
||||
if _, err := cm.Snapshotter.Prepare(context.TODO(), id, parentID); err != nil {
|
||||
if parent != nil {
|
||||
parent.Release()
|
||||
}
|
||||
return nil, errors.Wrapf(err, "failed to prepare %s", id)
|
||||
}
|
||||
|
||||
rec := &cacheRecord{
|
||||
mutable: true,
|
||||
id: id,
|
||||
cm: cm,
|
||||
refs: make(map[*cacheRef]struct{}),
|
||||
parent: parent,
|
||||
}
|
||||
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
cm.records[id] = rec // TODO: save to db
|
||||
|
||||
return rec.ref(), nil
|
||||
}
|
||||
func (cm *cacheManager) GetMutable(id string) (MutableRef, error) { // Rebase?
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
rec, ok := cm.records[id]
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(errNotFound, "%s not found", id)
|
||||
}
|
||||
|
||||
rec.mu.Lock()
|
||||
defer rec.mu.Unlock()
|
||||
if !rec.mutable {
|
||||
return nil, errors.Wrapf(errInvalid, "%s is not mutable", id)
|
||||
}
|
||||
|
||||
if rec.frozen || len(rec.refs) != 0 {
|
||||
return nil, errors.Wrapf(errLocked, "%s is locked", id)
|
||||
}
|
||||
|
||||
return rec.ref(), nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) DiskUsage(ctx context.Context) ([]UsageInfo, error) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
var du []UsageInfo
|
||||
|
||||
for id, cr := range cm.records {
|
||||
cr.mu.Lock()
|
||||
c := UsageInfo{
|
||||
ID: id,
|
||||
Active: cr.mutable,
|
||||
InUse: len(cr.refs) > 0,
|
||||
Size: -1, // TODO:
|
||||
}
|
||||
cr.mu.Unlock()
|
||||
du = append(du, c)
|
||||
}
|
||||
|
||||
return du, nil
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package cachemanager
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -13,7 +13,7 @@ import (
|
|||
"github.com/tonistiigi/buildkit_poc/snapshot"
|
||||
)
|
||||
|
||||
func TestCacheManager(t *testing.T) {
|
||||
func TestManager(t *testing.T) {
|
||||
tmpdir, err := ioutil.TempDir("", "cachemanager")
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
@ -21,7 +21,7 @@ func TestCacheManager(t *testing.T) {
|
|||
snapshotter, err := naive.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
cm, err := NewCacheManager(CacheManagerOpt{
|
||||
cm, err := NewManager(ManagerOpt{
|
||||
Root: tmpdir,
|
||||
Snapshotter: snapshotter,
|
||||
})
|
||||
|
@ -49,18 +49,18 @@ func TestCacheManager(t *testing.T) {
|
|||
err = lm.Unmount()
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = cm.GetActive(active.ID())
|
||||
_, err = cm.GetMutable(active.ID())
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, errLocked, errors.Cause(err))
|
||||
|
||||
checkDiskUsage(t, cm, 1, 0)
|
||||
|
||||
snap, err := active.ReleaseActive()
|
||||
snap, err := active.Freeze()
|
||||
assert.NoError(t, err)
|
||||
|
||||
checkDiskUsage(t, cm, 1, 0)
|
||||
|
||||
_, err = cm.GetActive(active.ID())
|
||||
_, err = cm.GetMutable(active.ID())
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, errLocked, errors.Cause(err))
|
||||
|
||||
|
@ -69,7 +69,7 @@ func TestCacheManager(t *testing.T) {
|
|||
|
||||
checkDiskUsage(t, cm, 0, 1)
|
||||
|
||||
active, err = cm.GetActive(active.ID())
|
||||
active, err = cm.GetMutable(active.ID())
|
||||
assert.NoError(t, err)
|
||||
|
||||
checkDiskUsage(t, cm, 1, 0)
|
||||
|
@ -82,11 +82,11 @@ func TestCacheManager(t *testing.T) {
|
|||
err = snap.Release()
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = cm.GetActive(active.ID())
|
||||
_, err = cm.GetMutable(active.ID())
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, errNotFound, errors.Cause(err))
|
||||
|
||||
_, err = cm.GetActive(snap.ID())
|
||||
_, err = cm.GetMutable(snap.ID())
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, errInvalid, errors.Cause(err))
|
||||
|
||||
|
@ -106,7 +106,7 @@ func TestCacheManager(t *testing.T) {
|
|||
|
||||
checkDiskUsage(t, cm, 2, 0)
|
||||
|
||||
snap3, err := active2.ReleaseActive()
|
||||
snap3, err := active2.Freeze()
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = snap2.Release()
|
||||
|
@ -123,7 +123,7 @@ func TestCacheManager(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func checkDiskUsage(t *testing.T, cm CacheManager, inuse, unused int) {
|
||||
func checkDiskUsage(t *testing.T, cm Manager, inuse, unused int) {
|
||||
du, err := cm.DiskUsage(context.TODO())
|
||||
assert.NoError(t, err)
|
||||
var inuseActual, unusedActual int
|
|
@ -0,0 +1,166 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type ImmutableRef interface {
|
||||
Mountable
|
||||
ID() string
|
||||
Release() error
|
||||
Size() (int64, error)
|
||||
// Prepare() / ChainID() / Meta()
|
||||
}
|
||||
|
||||
type MutableRef interface {
|
||||
Mountable
|
||||
ID() string
|
||||
Freeze() (ImmutableRef, error)
|
||||
ReleaseAndCommit(ctx context.Context) (ImmutableRef, error)
|
||||
Size() (int64, error)
|
||||
}
|
||||
|
||||
type Mountable interface {
|
||||
Mount() ([]mount.Mount, error)
|
||||
}
|
||||
|
||||
type cacheRecord struct {
|
||||
mu sync.Mutex
|
||||
mutable bool
|
||||
frozen bool
|
||||
// meta SnapMeta
|
||||
refs map[*cacheRef]struct{}
|
||||
id string
|
||||
cm *cacheManager
|
||||
parent ImmutableRef
|
||||
}
|
||||
|
||||
// hold manager lock before calling
|
||||
func (cr *cacheRecord) ref() *cacheRef {
|
||||
ref := &cacheRef{cacheRecord: cr}
|
||||
cr.refs[ref] = struct{}{}
|
||||
return ref
|
||||
}
|
||||
|
||||
type cacheRef struct {
|
||||
*cacheRecord
|
||||
}
|
||||
|
||||
func (sr *cacheRef) Mount() ([]mount.Mount, error) {
|
||||
sr.mu.Lock()
|
||||
defer sr.mu.Unlock()
|
||||
|
||||
if sr.mutable {
|
||||
m, err := sr.cm.Snapshotter.Mounts(context.TODO(), sr.id)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to mount %s", sr.id)
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("snapshot mount not implemented")
|
||||
}
|
||||
|
||||
func (sr *cacheRef) Release() error {
|
||||
sr.cm.mu.Lock()
|
||||
defer sr.cm.mu.Unlock()
|
||||
|
||||
sr.mu.Lock()
|
||||
defer sr.mu.Unlock()
|
||||
|
||||
return sr.release()
|
||||
}
|
||||
|
||||
func (sr *cacheRef) release() error {
|
||||
if sr.parent != nil {
|
||||
if err := sr.parent.(*cacheRef).release(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
delete(sr.refs, sr)
|
||||
sr.frozen = false
|
||||
|
||||
if len(sr.refs) == 0 {
|
||||
//go sr.cm.GC()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sr *cacheRef) Freeze() (ImmutableRef, error) {
|
||||
sr.cm.mu.Lock()
|
||||
defer sr.cm.mu.Unlock()
|
||||
|
||||
sr.mu.Lock()
|
||||
defer sr.mu.Unlock()
|
||||
|
||||
if !sr.mutable || sr.frozen || len(sr.refs) != 1 {
|
||||
return nil, errors.Wrapf(errInvalid, "invalid mutable")
|
||||
}
|
||||
|
||||
if _, ok := sr.refs[sr]; !ok {
|
||||
return nil, errors.Wrapf(errInvalid, "invalid mutable")
|
||||
}
|
||||
|
||||
sr.frozen = true
|
||||
|
||||
return sr, nil
|
||||
}
|
||||
|
||||
func (sr *cacheRef) ReleaseAndCommit(ctx context.Context) (ImmutableRef, error) {
|
||||
sr.cm.mu.Lock()
|
||||
defer sr.cm.mu.Unlock()
|
||||
|
||||
sr.mu.Lock()
|
||||
|
||||
if !sr.mutable || sr.frozen {
|
||||
sr.mu.Unlock()
|
||||
return nil, errors.Wrapf(errInvalid, "invalid mutable")
|
||||
}
|
||||
if len(sr.refs) != 1 {
|
||||
sr.mu.Unlock()
|
||||
return nil, errors.Wrapf(errInvalid, "multiple mutable references")
|
||||
}
|
||||
|
||||
sr.mu.Unlock()
|
||||
|
||||
id := generateID() // TODO: no need to actually switch the key here
|
||||
|
||||
err := sr.cm.Snapshotter.Commit(ctx, id, sr.id)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to commit %s", sr.id)
|
||||
}
|
||||
|
||||
delete(sr.cm.records, sr.id)
|
||||
|
||||
rec := &cacheRecord{
|
||||
id: id,
|
||||
cm: sr.cm,
|
||||
refs: make(map[*cacheRef]struct{}),
|
||||
}
|
||||
sr.cm.records[id] = rec // TODO: save to db
|
||||
|
||||
return rec.ref(), nil
|
||||
}
|
||||
|
||||
func (sr *cacheRef) Size() (int64, error) {
|
||||
return -1, errors.New("Size not implemented")
|
||||
}
|
||||
|
||||
func (sr *cacheRef) ID() string {
|
||||
return sr.id
|
||||
}
|
||||
|
||||
func generateID() string {
|
||||
b := make([]byte, 32)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return hex.EncodeToString(b)
|
||||
}
|
|
@ -1,383 +0,0 @@
|
|||
package cachemanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tonistiigi/buildkit_poc/snapshot"
|
||||
)
|
||||
|
||||
const dbFile = "cache.db"
|
||||
|
||||
var (
|
||||
errLocked = errors.New("locked")
|
||||
errNotFound = errors.New("not found")
|
||||
errInvalid = errors.New("invalid")
|
||||
)
|
||||
|
||||
type CacheManagerOpt struct {
|
||||
Snapshotter snapshot.Snapshotter
|
||||
Root string
|
||||
GCPolicy GCPolicy
|
||||
}
|
||||
|
||||
// GCPolicy defines policy for garbage collection
|
||||
type GCPolicy struct {
|
||||
MaxSize uint64
|
||||
MaxKeepDuration time.Duration
|
||||
}
|
||||
|
||||
// // CachePolicy defines policy for keeping a resource in cache
|
||||
// type CachePolicy struct {
|
||||
// Priority int
|
||||
// LastUsed time.Time
|
||||
// }
|
||||
//
|
||||
// func defaultCachePolicy() CachePolicy {
|
||||
// return CachePolicy{Priority: 10, LastUsed: time.Now()}
|
||||
// }
|
||||
|
||||
type SnapshotRef interface {
|
||||
Mountable
|
||||
ID() string
|
||||
Release() error
|
||||
Size() (int64, error)
|
||||
// Prepare() / ChainID() / Meta()
|
||||
}
|
||||
|
||||
type ActiveRef interface {
|
||||
Mountable
|
||||
ID() string
|
||||
ReleaseActive() (SnapshotRef, error)
|
||||
ReleaseAndCommit(ctx context.Context) (SnapshotRef, error)
|
||||
Size() (int64, error)
|
||||
}
|
||||
|
||||
type Mountable interface {
|
||||
Mount() ([]mount.Mount, error)
|
||||
}
|
||||
|
||||
type CacheAccessor interface {
|
||||
Get(id string) (SnapshotRef, error)
|
||||
New(s SnapshotRef) (ActiveRef, error)
|
||||
GetActive(id string) (ActiveRef, error) // Rebase?
|
||||
}
|
||||
|
||||
type CacheController interface {
|
||||
DiskUsage(ctx context.Context) ([]CacheRecord, error)
|
||||
Prune(ctx context.Context) (map[string]int64, error)
|
||||
GC(ctx context.Context) error
|
||||
}
|
||||
|
||||
type CacheManager interface {
|
||||
CacheAccessor
|
||||
CacheController
|
||||
Close() error
|
||||
}
|
||||
|
||||
type CacheRecord struct {
|
||||
ID string
|
||||
Active bool
|
||||
InUse bool
|
||||
Size int64
|
||||
// Meta string
|
||||
// LastUsed time.Time
|
||||
}
|
||||
|
||||
type cacheManager struct {
|
||||
db *bolt.DB // note: no particual reason for bolt
|
||||
records map[string]*cacheRecord
|
||||
mu sync.Mutex
|
||||
CacheManagerOpt
|
||||
}
|
||||
|
||||
func NewCacheManager(opt CacheManagerOpt) (CacheManager, error) {
|
||||
if err := os.MkdirAll(opt.Root, 0700); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create %s", opt.Root)
|
||||
}
|
||||
|
||||
p := filepath.Join(opt.Root, dbFile)
|
||||
db, err := bolt.Open(p, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to open database file %s", p)
|
||||
}
|
||||
|
||||
cm := &cacheManager{
|
||||
CacheManagerOpt: opt,
|
||||
db: db,
|
||||
records: make(map[string]*cacheRecord),
|
||||
}
|
||||
|
||||
if err := cm.init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// cm.scheduleGC(5 * time.Minute)
|
||||
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) init() error {
|
||||
// load all refs from db
|
||||
// compare with the walk from Snapshotter
|
||||
// delete items that are not in db (or implement broken transaction detection)
|
||||
// keep all refs in memory(maybe in future work on disk only or with lru)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) Close() error {
|
||||
// TODO: allocate internal context and cancel it here
|
||||
return cm.db.Close()
|
||||
}
|
||||
|
||||
func (cm *cacheManager) Get(id string) (SnapshotRef, error) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
rec, ok := cm.records[id]
|
||||
if !ok {
|
||||
// TODO: lazy-load from Snapshotter
|
||||
return nil, errors.Wrapf(errNotFound, "%s not found", id)
|
||||
}
|
||||
|
||||
rec.mu.Lock()
|
||||
defer rec.mu.Unlock()
|
||||
|
||||
if rec.active && !rec.sharedActive {
|
||||
if len(rec.refs) != 0 {
|
||||
return nil, errors.Wrapf(errLocked, "%s is locked", id)
|
||||
} else {
|
||||
rec.sharedActive = true
|
||||
}
|
||||
}
|
||||
|
||||
return rec.ref(), nil
|
||||
}
|
||||
func (cm *cacheManager) New(s SnapshotRef) (ActiveRef, error) {
|
||||
id := generateID()
|
||||
|
||||
var parent SnapshotRef
|
||||
var parentID string
|
||||
if s != nil {
|
||||
var err error
|
||||
parent, err = cm.Get(s.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parentID = parent.ID()
|
||||
}
|
||||
|
||||
if _, err := cm.Snapshotter.Prepare(context.TODO(), id, parentID); err != nil {
|
||||
if parent != nil {
|
||||
parent.Release()
|
||||
}
|
||||
return nil, errors.Wrapf(err, "failed to prepare %s", id)
|
||||
}
|
||||
|
||||
rec := &cacheRecord{
|
||||
active: true,
|
||||
id: id,
|
||||
cm: cm,
|
||||
refs: make(map[*snapshotRef]struct{}),
|
||||
parent: parent,
|
||||
}
|
||||
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
cm.records[id] = rec // TODO: save to db
|
||||
|
||||
return rec.ref(), nil
|
||||
}
|
||||
func (cm *cacheManager) GetActive(id string) (ActiveRef, error) { // Rebase?
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
rec, ok := cm.records[id]
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(errNotFound, "%s not found", id)
|
||||
}
|
||||
|
||||
rec.mu.Lock()
|
||||
defer rec.mu.Unlock()
|
||||
if !rec.active {
|
||||
return nil, errors.Wrapf(errInvalid, "%s is not active", id)
|
||||
}
|
||||
|
||||
if rec.sharedActive || len(rec.refs) != 0 {
|
||||
return nil, errors.Wrapf(errLocked, "%s is locked", id)
|
||||
}
|
||||
|
||||
return rec.ref(), nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) DiskUsage(ctx context.Context) ([]CacheRecord, error) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
var du []CacheRecord
|
||||
|
||||
for id, cr := range cm.records {
|
||||
cr.mu.Lock()
|
||||
c := CacheRecord{
|
||||
ID: id,
|
||||
Active: cr.active,
|
||||
InUse: len(cr.refs) > 0,
|
||||
Size: -1, // TODO
|
||||
}
|
||||
cr.mu.Unlock()
|
||||
du = append(du, c)
|
||||
}
|
||||
|
||||
return du, nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) Prune(ctx context.Context) (map[string]int64, error) {
|
||||
return nil, errors.New("Prune not implemented")
|
||||
}
|
||||
|
||||
func (cm *cacheManager) GC(ctx context.Context) error {
|
||||
return errors.New("GC not implemented")
|
||||
}
|
||||
|
||||
type cacheRecord struct {
|
||||
mu sync.Mutex
|
||||
active bool
|
||||
sharedActive bool
|
||||
// meta SnapMeta
|
||||
refs map[*snapshotRef]struct{}
|
||||
id string
|
||||
cm *cacheManager
|
||||
parent SnapshotRef
|
||||
}
|
||||
|
||||
// hold manager lock before calling
|
||||
func (cr *cacheRecord) ref() *snapshotRef {
|
||||
ref := &snapshotRef{cacheRecord: cr}
|
||||
cr.refs[ref] = struct{}{}
|
||||
return ref
|
||||
}
|
||||
|
||||
type snapshotRef struct {
|
||||
*cacheRecord
|
||||
}
|
||||
|
||||
func (sr *snapshotRef) Mount() ([]mount.Mount, error) {
|
||||
sr.mu.Lock()
|
||||
defer sr.mu.Unlock()
|
||||
|
||||
if sr.active {
|
||||
m, err := sr.cm.Snapshotter.Mounts(context.TODO(), sr.id)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to mount %s", sr.id)
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("snapshot mount not implemented")
|
||||
}
|
||||
|
||||
func (sr *snapshotRef) Release() error {
|
||||
sr.cm.mu.Lock()
|
||||
defer sr.cm.mu.Unlock()
|
||||
|
||||
sr.mu.Lock()
|
||||
defer sr.mu.Unlock()
|
||||
|
||||
return sr.release()
|
||||
}
|
||||
|
||||
func (sr *snapshotRef) release() error {
|
||||
if sr.parent != nil {
|
||||
if err := sr.parent.(*snapshotRef).release(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
delete(sr.refs, sr)
|
||||
sr.sharedActive = false
|
||||
|
||||
if len(sr.refs) == 0 {
|
||||
//go sr.cm.GC()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sr *snapshotRef) ReleaseActive() (SnapshotRef, error) {
|
||||
sr.cm.mu.Lock()
|
||||
defer sr.cm.mu.Unlock()
|
||||
|
||||
sr.mu.Lock()
|
||||
defer sr.mu.Unlock()
|
||||
|
||||
if !sr.active || sr.sharedActive || len(sr.refs) != 1 {
|
||||
return nil, errors.Wrapf(errInvalid, "invalid active")
|
||||
}
|
||||
|
||||
if _, ok := sr.refs[sr]; !ok {
|
||||
return nil, errors.Wrapf(errInvalid, "invalid active")
|
||||
}
|
||||
|
||||
sr.sharedActive = true
|
||||
|
||||
return sr, nil
|
||||
}
|
||||
|
||||
func (sr *snapshotRef) ReleaseAndCommit(ctx context.Context) (SnapshotRef, error) {
|
||||
sr.cm.mu.Lock()
|
||||
defer sr.cm.mu.Unlock()
|
||||
|
||||
sr.mu.Lock()
|
||||
|
||||
if !sr.active || sr.sharedActive {
|
||||
sr.mu.Unlock()
|
||||
return nil, errors.Wrapf(errInvalid, "invalid active")
|
||||
}
|
||||
if len(sr.refs) != 1 {
|
||||
sr.mu.Unlock()
|
||||
return nil, errors.Wrapf(errInvalid, "multiple active references")
|
||||
}
|
||||
|
||||
sr.mu.Unlock()
|
||||
|
||||
id := generateID() // TODO: no need to actually switch the key here
|
||||
|
||||
err := sr.cm.Snapshotter.Commit(ctx, id, sr.id)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to commit %s", sr.id)
|
||||
}
|
||||
|
||||
delete(sr.cm.records, sr.id)
|
||||
|
||||
rec := &cacheRecord{
|
||||
id: id,
|
||||
cm: sr.cm,
|
||||
refs: make(map[*snapshotRef]struct{}),
|
||||
}
|
||||
sr.cm.records[id] = rec // TODO: save to db
|
||||
|
||||
return rec.ref(), nil
|
||||
}
|
||||
|
||||
func (sr *snapshotRef) Size() (int64, error) {
|
||||
return -1, errors.New("Size not implemented")
|
||||
}
|
||||
|
||||
func (sr *snapshotRef) ID() string {
|
||||
return sr.id
|
||||
}
|
||||
|
||||
func generateID() string {
|
||||
b := make([]byte, 32)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return hex.EncodeToString(b)
|
||||
}
|
|
@ -21,10 +21,9 @@ import (
|
|||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tonistiigi/buildkit_poc/cachemanager"
|
||||
"github.com/tonistiigi/buildkit_poc/sources"
|
||||
"github.com/tonistiigi/buildkit_poc/sources/containerimage"
|
||||
"github.com/tonistiigi/buildkit_poc/sources/identifier"
|
||||
"github.com/tonistiigi/buildkit_poc/cache"
|
||||
"github.com/tonistiigi/buildkit_poc/source"
|
||||
"github.com/tonistiigi/buildkit_poc/source/containerimage"
|
||||
)
|
||||
|
||||
func TestControl(t *testing.T) {
|
||||
|
@ -35,25 +34,25 @@ func TestControl(t *testing.T) {
|
|||
cd, err := localContainerd(tmpdir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
cm, err := cachemanager.NewCacheManager(cachemanager.CacheManagerOpt{
|
||||
cm, err := cache.NewManager(cache.ManagerOpt{
|
||||
Snapshotter: cd.Snapshotter,
|
||||
Root: filepath.Join(tmpdir, "cachemanager"),
|
||||
})
|
||||
|
||||
sm, err := sources.NewSourceManager()
|
||||
sm, err := source.NewManager()
|
||||
assert.NoError(t, err)
|
||||
|
||||
is, err := containerimage.NewContainerImageSource(containerimage.ContainerImageSourceOpt{
|
||||
Snapshotter: cd.Snapshotter,
|
||||
ContentStore: cd.ContentStore,
|
||||
Applier: cd.Applier,
|
||||
CacheAccessor: cm,
|
||||
is, err := containerimage.NewSource(containerimage.SourceOpt{
|
||||
Snapshotter: cd.Snapshotter,
|
||||
ContentStore: cd.ContentStore,
|
||||
Applier: cd.Applier,
|
||||
Accessor: cm,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
sm.Register(is)
|
||||
|
||||
img, err := identifier.NewImageIdentifier("docker.io/library/redis:latest")
|
||||
img, err := source.NewImageIdentifier("docker.io/library/redis:latest")
|
||||
assert.NoError(t, err)
|
||||
|
||||
snap, err := sm.Pull(context.TODO(), img)
|
||||
|
|
|
@ -13,29 +13,28 @@ import (
|
|||
"github.com/containerd/containerd/snapshot"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tonistiigi/buildkit_poc/cachemanager"
|
||||
"github.com/tonistiigi/buildkit_poc/sources"
|
||||
"github.com/tonistiigi/buildkit_poc/sources/identifier"
|
||||
"github.com/tonistiigi/buildkit_poc/cache"
|
||||
"github.com/tonistiigi/buildkit_poc/source"
|
||||
)
|
||||
|
||||
// TODO: break apart containerd specifics like contentstore so the resolver
|
||||
// code can be used with any implementation
|
||||
|
||||
type ContainerImageSourceOpt struct {
|
||||
Snapshotter snapshot.Snapshotter
|
||||
ContentStore content.Store
|
||||
Applier rootfs.Applier
|
||||
CacheAccessor cachemanager.CacheAccessor
|
||||
type SourceOpt struct {
|
||||
Snapshotter snapshot.Snapshotter
|
||||
ContentStore content.Store
|
||||
Applier rootfs.Applier
|
||||
Accessor cache.Accessor
|
||||
}
|
||||
|
||||
type imageSource struct {
|
||||
ContainerImageSourceOpt
|
||||
SourceOpt
|
||||
resolver remotes.Resolver
|
||||
}
|
||||
|
||||
func NewContainerImageSource(opt ContainerImageSourceOpt) (sources.Source, error) {
|
||||
func NewSource(opt SourceOpt) (source.Source, error) {
|
||||
is := &imageSource{
|
||||
ContainerImageSourceOpt: opt,
|
||||
SourceOpt: opt,
|
||||
resolver: docker.NewResolver(docker.ResolverOptions{
|
||||
Client: http.DefaultClient,
|
||||
}),
|
||||
|
@ -44,14 +43,14 @@ func NewContainerImageSource(opt ContainerImageSourceOpt) (sources.Source, error
|
|||
}
|
||||
|
||||
func (is *imageSource) ID() string {
|
||||
return identifier.DockerImageScheme
|
||||
return source.DockerImageScheme
|
||||
}
|
||||
|
||||
func (is *imageSource) Pull(ctx context.Context, id identifier.Identifier) (cachemanager.SnapshotRef, error) {
|
||||
func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.ImmutableRef, error) {
|
||||
// TODO: update this to always centralize layer downloads/unpacks
|
||||
// TODO: progress status
|
||||
|
||||
imageIdentifier, ok := id.(*identifier.ImageIdentifier)
|
||||
imageIdentifier, ok := id.(*source.ImageIdentifier)
|
||||
if !ok {
|
||||
return nil, errors.New("invalid identifier")
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package identifier
|
||||
package source
|
||||
|
||||
import (
|
||||
"strings"
|
|
@ -0,0 +1,43 @@
|
|||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tonistiigi/buildkit_poc/cache"
|
||||
)
|
||||
|
||||
type Source interface {
|
||||
ID() string
|
||||
Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error)
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
mu sync.Mutex
|
||||
sources map[string]Source
|
||||
}
|
||||
|
||||
func NewManager() (*Manager, error) {
|
||||
return &Manager{
|
||||
sources: make(map[string]Source),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sm *Manager) Register(src Source) {
|
||||
sm.mu.Lock()
|
||||
sm.sources[src.ID()] = src
|
||||
sm.mu.Unlock()
|
||||
}
|
||||
|
||||
func (sm *Manager) Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error) {
|
||||
sm.mu.Lock()
|
||||
src, ok := sm.sources[id.ID()]
|
||||
sm.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return nil, errors.Errorf("no handler fro %s", id.ID())
|
||||
}
|
||||
|
||||
return src.Pull(ctx, id)
|
||||
}
|
|
@ -1,54 +0,0 @@
|
|||
package sources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/rootfs"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tonistiigi/buildkit_poc/cachemanager"
|
||||
"github.com/tonistiigi/buildkit_poc/sources/identifier"
|
||||
)
|
||||
|
||||
type ContainerImageSourceOpt struct {
|
||||
Snapshotter snapshot.Snapshotter
|
||||
ContentStore content.Store
|
||||
Applier rootfs.Applier
|
||||
CacheAccessor cachemanager.CacheAccessor
|
||||
}
|
||||
|
||||
type Source interface {
|
||||
ID() string
|
||||
Pull(ctx context.Context, id identifier.Identifier) (cachemanager.SnapshotRef, error)
|
||||
}
|
||||
|
||||
type SourceManager struct {
|
||||
mu sync.Mutex
|
||||
sources map[string]Source
|
||||
}
|
||||
|
||||
func NewSourceManager() (*SourceManager, error) {
|
||||
return &SourceManager{
|
||||
sources: make(map[string]Source),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sm *SourceManager) Register(src Source) {
|
||||
sm.mu.Lock()
|
||||
sm.sources[src.ID()] = src
|
||||
sm.mu.Unlock()
|
||||
}
|
||||
|
||||
func (sm *SourceManager) Pull(ctx context.Context, id identifier.Identifier) (cachemanager.SnapshotRef, error) {
|
||||
sm.mu.Lock()
|
||||
src, ok := sm.sources[id.ID()]
|
||||
sm.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return nil, errors.Errorf("no handler fro %s", id.ID())
|
||||
}
|
||||
|
||||
return src.Pull(ctx, id)
|
||||
}
|
Loading…
Reference in New Issue