solver: add release support for bolt backend

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
docker-18.09
Tonis Tiigi 2018-02-27 14:48:08 -08:00
parent a9db799188
commit 832c140d4a
2 changed files with 163 additions and 22 deletions

View File

@ -10,9 +10,11 @@ import (
) )
const ( const (
mainBucket = "_main" mainBucket = "_main"
resultBucket = "_result" resultBucket = "_result"
linksBucket = "_links" linksBucket = "_links"
byResultBucket = "_byresult"
backlinksBucket = "_backlinks"
) )
type Store struct { type Store struct {
@ -25,7 +27,7 @@ func NewStore(dbPath string) (*Store, error) {
return nil, errors.Wrapf(err, "failed to open database file %s", dbPath) return nil, errors.Wrapf(err, "failed to open database file %s", dbPath)
} }
if err := db.Update(func(tx *bolt.Tx) error { if err := db.Update(func(tx *bolt.Tx) error {
for _, b := range []string{mainBucket, resultBucket, linksBucket} { for _, b := range []string{mainBucket, resultBucket, linksBucket, byResultBucket, backlinksBucket} {
if _, err := tx.CreateBucketIfNotExists([]byte(b)); err != nil { if _, err := tx.CreateBucketIfNotExists([]byte(b)); err != nil {
return err return err
} }
@ -70,6 +72,25 @@ func (s *Store) Set(info solver.CacheKeyInfo) error {
}) })
} }
func (s *Store) Walk(fn func(id string) error) error {
ids := make([]string, 0)
if err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(mainBucket))
return b.ForEach(func(k, v []byte) error {
ids = append(ids, string(k))
return nil
})
}); err != nil {
return err
}
for _, id := range ids {
if err := fn(id); err != nil {
return err
}
}
return nil
}
func (s *Store) WalkResults(id string, fn func(solver.CacheResult) error) error { func (s *Store) WalkResults(id string, fn func(solver.CacheResult) error) error {
return s.db.View(func(tx *bolt.Tx) error { return s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(resultBucket)) b := tx.Bucket([]byte(resultBucket))
@ -119,11 +140,7 @@ func (s *Store) Load(id string, resultID string) (solver.CacheResult, error) {
func (s *Store) AddResult(id string, res solver.CacheResult) error { func (s *Store) AddResult(id string, res solver.CacheResult) error {
return s.db.Update(func(tx *bolt.Tx) error { return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(resultBucket)) b, err := tx.Bucket([]byte(resultBucket)).CreateBucketIfNotExists([]byte(id))
if b == nil {
return errors.WithStack(solver.ErrNotFound)
}
b, err := b.CreateBucketIfNotExists([]byte(id))
if err != nil { if err != nil {
return err return err
} }
@ -131,21 +148,129 @@ func (s *Store) AddResult(id string, res solver.CacheResult) error {
if err != nil { if err != nil {
return err return err
} }
return b.Put([]byte(res.ID), dt) if err := b.Put([]byte(res.ID), dt); err != nil {
return err
}
b, err = tx.Bucket([]byte(byResultBucket)).CreateBucketIfNotExists([]byte(res.ID))
if err != nil {
return err
}
if err := b.Put([]byte(id), []byte{}); err != nil {
return err
}
return nil
}) })
} }
func (s *Store) Release(id, resultID string) error { func (s *Store) Release(resultID string) error {
return errors.Errorf("not-implemented") return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(byResultBucket))
if b == nil {
return errors.WithStack(solver.ErrNotFound)
}
b = b.Bucket([]byte(resultID))
if b == nil {
return errors.WithStack(solver.ErrNotFound)
}
if err := b.ForEach(func(k, v []byte) error {
return s.releaseHelper(tx, string(k), resultID)
}); err != nil {
return err
}
return nil
})
}
func (s *Store) releaseHelper(tx *bolt.Tx, id, resultID string) error {
results := tx.Bucket([]byte(resultBucket)).Bucket([]byte(id))
if results == nil {
return nil
}
if err := results.Delete([]byte(resultID)); err != nil {
return err
}
ids := tx.Bucket([]byte(byResultBucket))
ids = ids.Bucket([]byte(resultID))
if ids == nil {
return nil
}
if err := ids.Delete([]byte(resultID)); err != nil {
return err
}
if isEmptyBucket(ids) {
if err := tx.Bucket([]byte(byResultBucket)).DeleteBucket([]byte(resultID)); err != nil {
return err
}
}
links := tx.Bucket([]byte(resultBucket))
if results == nil {
return nil
}
links = links.Bucket([]byte(id))
return s.emptyBranchWithParents(tx, []byte(id))
}
func (s *Store) emptyBranchWithParents(tx *bolt.Tx, id []byte) error {
results := tx.Bucket([]byte(resultBucket)).Bucket(id)
if results == nil {
return nil
}
isEmptyLinks := true
links := tx.Bucket([]byte(linksBucket)).Bucket(id)
if links != nil {
isEmptyLinks = isEmptyBucket(links)
}
if !isEmptyBucket(results) || !isEmptyLinks {
return nil
}
if backlinks := tx.Bucket([]byte(backlinksBucket)).Bucket(id); backlinks != nil {
if err := backlinks.ForEach(func(k, v []byte) error {
if subLinks := tx.Bucket([]byte(linksBucket)).Bucket(k); subLinks != nil {
if err := subLinks.ForEach(func(k, v []byte) error {
parts := bytes.Split(k, []byte("@"))
if len(parts) != 2 {
return errors.Errorf("invalid key %s", k)
}
if bytes.Equal(id, parts[1]) {
return subLinks.Delete(k)
}
return nil
}); err != nil {
return err
}
if isEmptyBucket(subLinks) {
if err := tx.Bucket([]byte(linksBucket)).DeleteBucket(k); err != nil {
return err
}
}
}
return s.emptyBranchWithParents(tx, k)
}); err != nil {
return err
}
if err := tx.Bucket([]byte(backlinksBucket)).DeleteBucket(id); err != nil {
return err
}
}
return tx.Bucket([]byte(mainBucket)).Delete(id)
} }
func (s *Store) AddLink(id string, link solver.CacheInfoLink, target string) error { func (s *Store) AddLink(id string, link solver.CacheInfoLink, target string) error {
return s.db.Update(func(tx *bolt.Tx) error { return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(linksBucket)) b, err := tx.Bucket([]byte(linksBucket)).CreateBucketIfNotExists([]byte(id))
if b == nil {
return errors.WithStack(solver.ErrNotFound)
}
b, err := b.CreateBucketIfNotExists([]byte(id))
if err != nil { if err != nil {
return err return err
} }
@ -155,7 +280,20 @@ func (s *Store) AddLink(id string, link solver.CacheInfoLink, target string) err
return err return err
} }
return b.Put(bytes.Join([][]byte{dt, []byte(target)}, []byte("@")), []byte{}) if err := b.Put(bytes.Join([][]byte{dt, []byte(target)}, []byte("@")), []byte{}); err != nil {
return err
}
b, err = tx.Bucket([]byte(backlinksBucket)).CreateBucketIfNotExists([]byte(target))
if err != nil {
return err
}
if err := b.Put([]byte(id), []byte{}); err != nil {
return err
}
return nil
}) })
} }
@ -198,3 +336,8 @@ func (s *Store) WalkLinks(id string, link solver.CacheInfoLink, fn func(id strin
} }
return nil return nil
} }
func isEmptyBucket(b *bolt.Bucket) bool {
k, _ := b.Cursor().First()
return k == nil
}

View File

@ -145,16 +145,14 @@ func (s *inMemoryStore) Release(resultID string) error {
delete(s.byResult, resultID) delete(s.byResult, resultID)
} }
if len(k.results) == 0 && len(k.links) == 0 { s.emptyBranchWithParents(k)
s.emptyBranchWithParents(k)
}
} }
return nil return nil
} }
func (s *inMemoryStore) emptyBranchWithParents(k *inMemoryKey) { func (s *inMemoryStore) emptyBranchWithParents(k *inMemoryKey) {
if len(k.results) != 0 && len(k.links) != 0 { if len(k.results) != 0 || len(k.links) != 0 {
return return
} }
for id := range k.backlinks { for id := range k.backlinks {