commit
34676f9175
|
@ -28,7 +28,7 @@ type ManagerOpt struct {
|
|||
|
||||
type Accessor interface {
|
||||
Get(ctx context.Context, id string) (ImmutableRef, error)
|
||||
New(ctx context.Context, s ImmutableRef) (MutableRef, error)
|
||||
New(ctx context.Context, s ImmutableRef, opt ...RefOption) (MutableRef, error)
|
||||
GetMutable(ctx context.Context, id string) (MutableRef, error) // Rebase?
|
||||
}
|
||||
|
||||
|
@ -158,7 +158,7 @@ func (cm *cacheManager) load(ctx context.Context, id string) (*cacheRecord, erro
|
|||
return rec, nil
|
||||
}
|
||||
|
||||
func (cm *cacheManager) New(ctx context.Context, s ImmutableRef) (MutableRef, error) {
|
||||
func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOption) (MutableRef, error) {
|
||||
id := identity.NewID()
|
||||
|
||||
var parent ImmutableRef
|
||||
|
@ -192,6 +192,15 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef) (MutableRef, er
|
|||
md: &md,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
if err := opt(rec); err != nil {
|
||||
if parent != nil {
|
||||
parent.Release(ctx)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
|
@ -219,6 +228,9 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef,
|
|||
}
|
||||
|
||||
if rec.equalImmutable != nil {
|
||||
if len(rec.equalImmutable.refs) != 0 {
|
||||
return nil, errors.Wrapf(errLocked, "%s is locked", id)
|
||||
}
|
||||
delete(cm.records, rec.equalImmutable.ID())
|
||||
if err := rec.equalImmutable.remove(ctx, false); err != nil {
|
||||
return nil, err
|
||||
|
@ -297,20 +309,17 @@ func (cm *cacheManager) DiskUsage(ctx context.Context) ([]*client.UsageInfo, err
|
|||
if d.Size == sizeUnknown {
|
||||
func(d *client.UsageInfo) {
|
||||
eg.Go(func() error {
|
||||
if !d.Mutable {
|
||||
ref, err := cm.Get(ctx, d.ID)
|
||||
if err != nil {
|
||||
d.Size = 0
|
||||
return nil
|
||||
}
|
||||
s, err := ref.Size(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Size = s
|
||||
return ref.Release(context.TODO())
|
||||
ref, err := cm.Get(ctx, d.ID)
|
||||
if err != nil {
|
||||
d.Size = 0
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
s, err := ref.Size(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Size = s
|
||||
return ref.Release(context.TODO())
|
||||
})
|
||||
}(d)
|
||||
}
|
||||
|
@ -326,3 +335,16 @@ func (cm *cacheManager) DiskUsage(ctx context.Context) ([]*client.UsageInfo, err
|
|||
func IsLocked(err error) bool {
|
||||
return errors.Cause(err) == errLocked
|
||||
}
|
||||
|
||||
type RefOption func(*cacheRecord) error
|
||||
|
||||
type cachePolicy int
|
||||
|
||||
const (
|
||||
cachePolicyDefault cachePolicy = iota
|
||||
cachePolicyKeepMutable
|
||||
)
|
||||
|
||||
func CachePolicyKeepMutable(cr *cacheRecord) error {
|
||||
return setCachePolicy(cr.md, cachePolicyKeepMutable)
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ func TestManager(t *testing.T) {
|
|||
|
||||
checkDiskUsage(t, ctx, cm, 0, 0)
|
||||
|
||||
active, err := cm.New(ctx, nil)
|
||||
active, err := cm.New(ctx, nil, CachePolicyKeepMutable)
|
||||
require.NoError(t, err)
|
||||
|
||||
m, err := active.Mount(ctx, false)
|
||||
|
@ -102,7 +102,7 @@ func TestManager(t *testing.T) {
|
|||
err = snap.Release(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
active2, err := cm.New(ctx, snap2)
|
||||
active2, err := cm.New(ctx, snap2, CachePolicyKeepMutable)
|
||||
require.NoError(t, err)
|
||||
|
||||
checkDiskUsage(t, ctx, cm, 2, 0)
|
||||
|
@ -133,7 +133,7 @@ func TestLazyCommit(t *testing.T) {
|
|||
|
||||
cm := getCacheManager(t, tmpdir)
|
||||
|
||||
active, err := cm.New(ctx, nil)
|
||||
active, err := cm.New(ctx, nil, CachePolicyKeepMutable)
|
||||
require.NoError(t, err)
|
||||
|
||||
// after commit mutable is locked
|
||||
|
@ -159,6 +159,12 @@ func TestLazyCommit(t *testing.T) {
|
|||
snap, err = cm.Get(ctx, snap.ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, snap.ID(), snap2.ID())
|
||||
|
||||
// active can't be get while immutable is held
|
||||
_, err = cm.GetMutable(ctx, active.ID())
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errLocked, errors.Cause(err))
|
||||
|
||||
err = snap.Release(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -196,7 +202,7 @@ func TestLazyCommit(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// test restarting after commit
|
||||
active, err = cm.New(ctx, nil)
|
||||
active, err = cm.New(ctx, nil, CachePolicyKeepMutable)
|
||||
require.NoError(t, err)
|
||||
|
||||
// after commit mutable is locked
|
||||
|
|
|
@ -17,6 +17,7 @@ const sizeUnknown int64 = -1
|
|||
const keySize = "snapshot.size"
|
||||
const keyEqualMutable = "cache.equalMutable"
|
||||
const keyEqualImmutable = "cache.equalImmutable"
|
||||
const keyCachePolicy = "cache.cachePolicy"
|
||||
|
||||
func setSize(si *metadata.StorageItem, s int64) error {
|
||||
v, err := metadata.NewValue(s)
|
||||
|
@ -70,3 +71,25 @@ func clearEqualMutable(si *metadata.StorageItem) error {
|
|||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func setCachePolicy(si *metadata.StorageItem, p cachePolicy) error {
|
||||
v, err := metadata.NewValue(p)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create size value")
|
||||
}
|
||||
return si.Update(func(b *bolt.Bucket) error {
|
||||
return si.SetValue(b, keyCachePolicy, v)
|
||||
})
|
||||
}
|
||||
|
||||
func getCachePolicy(si *metadata.StorageItem) cachePolicy {
|
||||
v := si.Get(keyCachePolicy)
|
||||
if v == nil {
|
||||
return cachePolicyDefault
|
||||
}
|
||||
var p cachePolicy
|
||||
if err := v.Unmarshal(&p); err != nil {
|
||||
return cachePolicyDefault
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
|
|
@ -140,6 +140,7 @@ func (cr *cacheRecord) Mount(ctx context.Context, readonly bool) ([]mount.Mount,
|
|||
}
|
||||
|
||||
func (cr *cacheRecord) remove(ctx context.Context, removeSnapshot bool) error {
|
||||
delete(cr.cm.records, cr.ID())
|
||||
if err := cr.cm.md.Clear(cr.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -210,7 +211,6 @@ func (sr *cacheRecord) finalize(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to commit %s", sr.equalMutable.ID())
|
||||
}
|
||||
delete(sr.cm.records, sr.equalMutable.ID())
|
||||
if err := sr.equalMutable.remove(ctx, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -274,10 +274,19 @@ func (sr *mutableRef) Release(ctx context.Context) error {
|
|||
|
||||
func (sr *mutableRef) release(ctx context.Context) error {
|
||||
delete(sr.refs, sr)
|
||||
// delete(sr.cm.records, sr.ID())
|
||||
// if err := sr.remove(ctx, true); err != nil {
|
||||
// return err
|
||||
// }
|
||||
if getCachePolicy(sr.md) != cachePolicyKeepMutable {
|
||||
if sr.equalImmutable != nil {
|
||||
if err := sr.equalImmutable.remove(ctx, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if sr.parent != nil {
|
||||
if err := sr.parent.(*immutableRef).release(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return sr.remove(ctx, true)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string) (target str
|
|||
|
||||
initializeRepo := false
|
||||
if remoteRef == nil {
|
||||
remoteRef, err = gs.cache.New(ctx, nil)
|
||||
remoteRef, err = gs.cache.New(ctx, nil, cache.CachePolicyKeepMutable)
|
||||
if err != nil {
|
||||
return "", nil, errors.Wrapf(err, "failed to create new mutable for %s", remote)
|
||||
}
|
||||
|
|
|
@ -104,7 +104,7 @@ func (ls *localSourceHandler) Snapshot(ctx context.Context) (out cache.Immutable
|
|||
}
|
||||
|
||||
if mutable == nil {
|
||||
m, err := ls.cm.New(ctx, nil)
|
||||
m, err := ls.cm.New(ctx, nil, cache.CachePolicyKeepMutable)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue