383 lines
7.6 KiB
Go
383 lines
7.6 KiB
Go
package metadata
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
const (
|
|
mainBucket = "_main"
|
|
indexBucket = "_index"
|
|
externalBucket = "_external"
|
|
)
|
|
|
|
var errNotFound = errors.Errorf("not found")
|
|
|
|
type Store struct {
|
|
db *bolt.DB
|
|
}
|
|
|
|
func NewStore(dbPath string) (*Store, error) {
|
|
db, err := bolt.Open(dbPath, 0600, nil)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to open database file %s", dbPath)
|
|
}
|
|
return &Store{db: db}, nil
|
|
}
|
|
|
|
func (s *Store) DB() *bolt.DB {
|
|
return s.db
|
|
}
|
|
|
|
func (s *Store) All() ([]*StorageItem, error) {
|
|
var out []*StorageItem
|
|
err := s.db.View(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(mainBucket))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
return b.ForEach(func(key, _ []byte) error {
|
|
b := b.Bucket(key)
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
si, err := newStorageItem(string(key), b, s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
out = append(out, si)
|
|
return nil
|
|
})
|
|
})
|
|
return out, err
|
|
}
|
|
|
|
func (s *Store) Probe(index string) (bool, error) {
|
|
var exists bool
|
|
err := s.db.View(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(indexBucket))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
main := tx.Bucket([]byte(mainBucket))
|
|
if main == nil {
|
|
return nil
|
|
}
|
|
search := []byte(indexKey(index, ""))
|
|
c := b.Cursor()
|
|
k, _ := c.Seek(search)
|
|
if k != nil && bytes.HasPrefix(k, search) {
|
|
exists = true
|
|
}
|
|
return nil
|
|
})
|
|
return exists, err
|
|
}
|
|
|
|
func (s *Store) Search(index string) ([]*StorageItem, error) {
|
|
var out []*StorageItem
|
|
err := s.db.View(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(indexBucket))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
main := tx.Bucket([]byte(mainBucket))
|
|
if main == nil {
|
|
return nil
|
|
}
|
|
index = indexKey(index, "")
|
|
c := b.Cursor()
|
|
k, _ := c.Seek([]byte(index))
|
|
for {
|
|
if k != nil && strings.HasPrefix(string(k), index) {
|
|
itemID := strings.TrimPrefix(string(k), index)
|
|
k, _ = c.Next()
|
|
b := main.Bucket([]byte(itemID))
|
|
if b == nil {
|
|
logrus.Errorf("index pointing to missing record %s", itemID)
|
|
continue
|
|
}
|
|
si, err := newStorageItem(itemID, b, s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
out = append(out, si)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return out, err
|
|
}
|
|
|
|
func (s *Store) View(id string, fn func(b *bolt.Bucket) error) error {
|
|
return s.db.View(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(mainBucket))
|
|
if b == nil {
|
|
return errors.WithStack(errNotFound)
|
|
}
|
|
b = b.Bucket([]byte(id))
|
|
if b == nil {
|
|
return errors.WithStack(errNotFound)
|
|
}
|
|
return fn(b)
|
|
})
|
|
}
|
|
|
|
func (s *Store) Clear(id string) error {
|
|
return s.db.Update(func(tx *bolt.Tx) error {
|
|
external := tx.Bucket([]byte(externalBucket))
|
|
if external != nil {
|
|
external.DeleteBucket([]byte(id))
|
|
}
|
|
main := tx.Bucket([]byte(mainBucket))
|
|
if main == nil {
|
|
return nil
|
|
}
|
|
b := main.Bucket([]byte(id))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
si, err := newStorageItem(id, b, s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if indexes := si.Indexes(); len(indexes) > 0 {
|
|
b := tx.Bucket([]byte(indexBucket))
|
|
if b != nil {
|
|
for _, index := range indexes {
|
|
if err := b.Delete([]byte(indexKey(index, id))); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return main.DeleteBucket([]byte(id))
|
|
})
|
|
}
|
|
|
|
func (s *Store) Update(id string, fn func(b *bolt.Bucket) error) error {
|
|
return s.db.Update(func(tx *bolt.Tx) error {
|
|
b, err := tx.CreateBucketIfNotExists([]byte(mainBucket))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b, err = b.CreateBucketIfNotExists([]byte(id))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return fn(b)
|
|
})
|
|
}
|
|
|
|
func (s *Store) Get(id string) (*StorageItem, bool) {
|
|
empty := func() *StorageItem {
|
|
si, _ := newStorageItem(id, nil, s)
|
|
return si
|
|
}
|
|
tx, err := s.db.Begin(false)
|
|
if err != nil {
|
|
return empty(), false
|
|
}
|
|
defer tx.Rollback()
|
|
b := tx.Bucket([]byte(mainBucket))
|
|
if b == nil {
|
|
return empty(), false
|
|
}
|
|
b = b.Bucket([]byte(id))
|
|
if b == nil {
|
|
return empty(), false
|
|
}
|
|
si, _ := newStorageItem(id, b, s)
|
|
return si, true
|
|
}
|
|
|
|
func (s *Store) Close() error {
|
|
return s.db.Close()
|
|
}
|
|
|
|
type StorageItem struct {
|
|
id string
|
|
values map[string]*Value
|
|
queue []func(*bolt.Bucket) error
|
|
storage *Store
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func newStorageItem(id string, b *bolt.Bucket, s *Store) (*StorageItem, error) {
|
|
si := &StorageItem{
|
|
id: id,
|
|
storage: s,
|
|
values: make(map[string]*Value),
|
|
}
|
|
if b != nil {
|
|
if err := b.ForEach(func(k, v []byte) error {
|
|
var sv Value
|
|
if len(v) > 0 {
|
|
if err := json.Unmarshal(v, &sv); err != nil {
|
|
return err
|
|
}
|
|
si.values[string(k)] = &sv
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return si, err
|
|
}
|
|
}
|
|
return si, nil
|
|
}
|
|
|
|
func (s *StorageItem) Storage() *Store { // TODO: used in local source. how to remove this?
|
|
return s.storage
|
|
}
|
|
|
|
func (s *StorageItem) ID() string {
|
|
return s.id
|
|
}
|
|
|
|
func (s *StorageItem) View(fn func(b *bolt.Bucket) error) error {
|
|
return s.storage.View(s.id, fn)
|
|
}
|
|
|
|
func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error {
|
|
return s.storage.Update(s.id, fn)
|
|
}
|
|
|
|
func (s *StorageItem) Keys() []string {
|
|
keys := make([]string, 0, len(s.values))
|
|
for k := range s.values {
|
|
keys = append(keys, k)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
func (s *StorageItem) Get(k string) *Value {
|
|
s.mu.RLock()
|
|
v := s.values[k]
|
|
s.mu.RUnlock()
|
|
return v
|
|
}
|
|
|
|
func (s *StorageItem) GetExternal(k string) ([]byte, error) {
|
|
var dt []byte
|
|
err := s.storage.db.View(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(externalBucket))
|
|
if b == nil {
|
|
return errors.WithStack(errNotFound)
|
|
}
|
|
b = b.Bucket([]byte(s.id))
|
|
if b == nil {
|
|
return errors.WithStack(errNotFound)
|
|
}
|
|
dt = b.Get([]byte(k))
|
|
if dt == nil {
|
|
return errors.WithStack(errNotFound)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return dt, nil
|
|
}
|
|
|
|
func (s *StorageItem) SetExternal(k string, dt []byte) error {
|
|
return s.storage.db.Update(func(tx *bolt.Tx) error {
|
|
b, err := tx.CreateBucketIfNotExists([]byte(externalBucket))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b, err = b.CreateBucketIfNotExists([]byte(s.id))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return b.Put([]byte(k), dt)
|
|
})
|
|
}
|
|
|
|
func (s *StorageItem) Queue(fn func(b *bolt.Bucket) error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.queue = append(s.queue, fn)
|
|
}
|
|
|
|
func (s *StorageItem) Commit() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.Update(func(b *bolt.Bucket) error {
|
|
for _, fn := range s.queue {
|
|
if err := fn(b); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
s.queue = s.queue[:0]
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *StorageItem) Indexes() (out []string) {
|
|
for _, v := range s.values {
|
|
if v.Index != "" {
|
|
out = append(out, v.Index)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
|
|
if v == nil {
|
|
if err := b.Put([]byte(key), nil); err != nil {
|
|
return err
|
|
}
|
|
delete(s.values, key)
|
|
return nil
|
|
}
|
|
dt, err := json.Marshal(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := b.Put([]byte(key), dt); err != nil {
|
|
return err
|
|
}
|
|
if v.Index != "" {
|
|
b, err := b.Tx().CreateBucketIfNotExists([]byte(indexBucket))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := b.Put([]byte(indexKey(v.Index, s.ID())), []byte{}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
s.values[key] = v
|
|
return nil
|
|
}
|
|
|
|
type Value struct {
|
|
Value json.RawMessage `json:"value,omitempty"`
|
|
Index string `json:"index,omitempty"`
|
|
}
|
|
|
|
func NewValue(v interface{}) (*Value, error) {
|
|
dt, err := json.Marshal(v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Value{Value: json.RawMessage(dt)}, nil
|
|
}
|
|
|
|
func (v *Value) Unmarshal(target interface{}) error {
|
|
err := json.Unmarshal(v.Value, target)
|
|
return err
|
|
}
|
|
|
|
func indexKey(index, target string) string {
|
|
return index + "::" + target
|
|
}
|