mirror of https://github.com/hak5/bolt.git
commit
20a1479c4c
|
@ -65,6 +65,10 @@ func (b *Bucket) Writable() bool {
|
|||
// The cursor is only valid as long as the transaction is open.
|
||||
// Do not use a cursor after the transaction is closed.
|
||||
func (b *Bucket) Cursor() *Cursor {
|
||||
// Update transaction statistics.
|
||||
b.tx.stats.CursorCount++
|
||||
|
||||
// Allocate and return a cursor.
|
||||
return &Cursor{
|
||||
tx: b.tx,
|
||||
root: b.root,
|
||||
|
|
|
@ -341,7 +341,6 @@ func TestBucketPutSingle(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
fmt.Fprint(os.Stderr, ".")
|
||||
index++
|
||||
return true
|
||||
}
|
||||
|
@ -385,7 +384,6 @@ func TestBucketPutMultiple(t *testing.T) {
|
|||
return nil
|
||||
})
|
||||
})
|
||||
fmt.Fprint(os.Stderr, ".")
|
||||
return true
|
||||
}
|
||||
if err := quick.Check(f, qconfig()); err != nil {
|
||||
|
@ -442,7 +440,6 @@ func TestBucketDeleteQuick(t *testing.T) {
|
|||
})
|
||||
}
|
||||
})
|
||||
fmt.Fprint(os.Stderr, ".")
|
||||
return true
|
||||
}
|
||||
if err := quick.Check(f, qconfig()); err != nil {
|
||||
|
|
71
db.go
71
db.go
|
@ -40,6 +40,7 @@ type DB struct {
|
|||
rwtx *Tx
|
||||
txs []*Tx
|
||||
freelist *freelist
|
||||
stats Stats
|
||||
|
||||
rwlock sync.Mutex // Allows only one writer at a time.
|
||||
metalock sync.Mutex // Protects meta page access.
|
||||
|
@ -374,6 +375,9 @@ func (db *DB) removeTx(t *Tx) {
|
|||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Merge statistics.
|
||||
db.stats.TxStats.add(&t.stats)
|
||||
}
|
||||
|
||||
// Update executes a function within the context of a read-write managed transaction.
|
||||
|
@ -490,32 +494,12 @@ func (db *DB) CopyFile(path string, mode os.FileMode) error {
|
|||
return f.Close()
|
||||
}
|
||||
|
||||
// Stat retrieves stats on the database and its page usage.
|
||||
// Returns an error if the database is not open.
|
||||
func (db *DB) Stat() (*Stat, error) {
|
||||
// Obtain meta & mmap locks.
|
||||
// Stats retrieves ongoing performance stats for the database.
|
||||
// This is only updated when a transaction closes.
|
||||
func (db *DB) Stats() Stats {
|
||||
db.metalock.Lock()
|
||||
db.mmaplock.RLock()
|
||||
|
||||
var s = &Stat{
|
||||
MmapSize: len(db.data),
|
||||
TxCount: len(db.txs),
|
||||
}
|
||||
|
||||
// Release locks.
|
||||
db.mmaplock.RUnlock()
|
||||
db.metalock.Unlock()
|
||||
|
||||
err := db.Update(func(t *Tx) error {
|
||||
s.PageCount = int(t.meta.pgid)
|
||||
s.FreePageCount = len(db.freelist.all())
|
||||
s.PageSize = db.pageSize
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
defer db.metalock.Unlock()
|
||||
return db.stats
|
||||
}
|
||||
|
||||
// Check performs several consistency checks on the database.
|
||||
|
@ -625,25 +609,20 @@ func (db *DB) allocate(count int) (*page, error) {
|
|||
return p, nil
|
||||
}
|
||||
|
||||
// Stat represents stats on the database such as free pages and sizes.
|
||||
type Stat struct {
|
||||
// PageCount is the total number of allocated pages. This is a high water
|
||||
// mark in the database that represents how many pages have actually been
|
||||
// used. This will be smaller than the MmapSize / PageSize.
|
||||
PageCount int
|
||||
|
||||
// FreePageCount is the total number of pages which have been previously
|
||||
// allocated but are no longer used.
|
||||
FreePageCount int
|
||||
|
||||
// PageSize is the size, in bytes, of individual database pages.
|
||||
PageSize int
|
||||
|
||||
// MmapSize is the mmap-allocated size of the data file. When the data file
|
||||
// grows beyond this size, the database will obtain a lock on the mmap and
|
||||
// resize it.
|
||||
MmapSize int
|
||||
|
||||
// TxCount is the total number of reader transactions.
|
||||
TxCount int
|
||||
// Stats represents statistics about the database.
|
||||
type Stats struct {
|
||||
TxStats TxStats // global, ongoing stats.
|
||||
}
|
||||
|
||||
// Sub calculates and returns the difference between two sets of database stats.
|
||||
// This is useful when obtaining stats at two different points and time and
|
||||
// you need the performance counters that occurred within that time span.
|
||||
func (s *Stats) Sub(other *Stats) Stats {
|
||||
var diff Stats
|
||||
diff.TxStats = s.TxStats.Sub(&other.TxStats)
|
||||
return diff
|
||||
}
|
||||
|
||||
func (s *Stats) add(other *Stats) {
|
||||
s.TxStats.add(&other.TxStats)
|
||||
}
|
||||
|
|
79
db_test.go
79
db_test.go
|
@ -2,17 +2,23 @@ package bolt
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var statsFlag = flag.Bool("stats", false, "show performance stats")
|
||||
|
||||
// Ensure that a database can be opened without error.
|
||||
func TestOpen(t *testing.T) {
|
||||
f, _ := ioutil.TempFile("", "bolt-")
|
||||
|
@ -214,55 +220,6 @@ func TestDBCopyFile(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// Ensure the database can return stats about itself.
|
||||
func TestDBStat(t *testing.T) {
|
||||
withOpenDB(func(db *DB, path string) {
|
||||
db.Update(func(tx *Tx) error {
|
||||
tx.CreateBucket("widgets")
|
||||
b := tx.Bucket("widgets")
|
||||
for i := 0; i < 10000; i++ {
|
||||
b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Delete some keys.
|
||||
db.Update(func(tx *Tx) error {
|
||||
return tx.Bucket("widgets").Delete([]byte("10"))
|
||||
})
|
||||
db.Update(func(tx *Tx) error {
|
||||
return tx.Bucket("widgets").Delete([]byte("1000"))
|
||||
})
|
||||
|
||||
// Open some readers.
|
||||
t0, _ := db.Begin(false)
|
||||
t1, _ := db.Begin(false)
|
||||
t2, _ := db.Begin(false)
|
||||
t2.Rollback()
|
||||
|
||||
// Obtain stats.
|
||||
stat, err := db.Stat()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 127, stat.PageCount)
|
||||
assert.Equal(t, 4, stat.FreePageCount)
|
||||
assert.Equal(t, 4096, stat.PageSize)
|
||||
assert.Equal(t, 4194304, stat.MmapSize)
|
||||
assert.Equal(t, 2, stat.TxCount)
|
||||
|
||||
// Close readers.
|
||||
t0.Rollback()
|
||||
t1.Rollback()
|
||||
})
|
||||
}
|
||||
|
||||
// Ensure the getting stats on a closed database returns an error.
|
||||
func TestDBStatWhileClosed(t *testing.T) {
|
||||
var db DB
|
||||
stat, err := db.Stat()
|
||||
assert.Equal(t, err, ErrDatabaseNotOpen)
|
||||
assert.Nil(t, stat)
|
||||
}
|
||||
|
||||
// Ensure that an error is returned when a database write fails.
|
||||
func TestDBWriteFail(t *testing.T) {
|
||||
t.Skip("pending") // TODO(benbjohnson)
|
||||
|
@ -384,6 +341,11 @@ func withOpenDB(fn func(*DB, string)) {
|
|||
defer db.Close()
|
||||
fn(db, path)
|
||||
|
||||
// Log statistics.
|
||||
if *statsFlag {
|
||||
logStats(db)
|
||||
}
|
||||
|
||||
// Check database consistency after every test.
|
||||
mustCheck(db)
|
||||
})
|
||||
|
@ -411,3 +373,22 @@ func trunc(b []byte, length int) []byte {
|
|||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// writes the current database stats to the testing log.
|
||||
func logStats(db *DB) {
|
||||
var stats = db.Stats()
|
||||
fmt.Printf("[db] %-20s %-20s %-20s\n",
|
||||
fmt.Sprintf("pg(%d/%d)", stats.TxStats.PageCount, stats.TxStats.PageAlloc),
|
||||
fmt.Sprintf("cur(%d)", stats.TxStats.CursorCount),
|
||||
fmt.Sprintf("node(%d/%d)", stats.TxStats.NodeCount, stats.TxStats.NodeDeref),
|
||||
)
|
||||
fmt.Printf(" %-20s %-20s %-20s\n",
|
||||
fmt.Sprintf("rebal(%d/%v)", stats.TxStats.Rebalance, truncDuration(stats.TxStats.RebalanceTime)),
|
||||
fmt.Sprintf("spill(%d/%v)", stats.TxStats.Spill, truncDuration(stats.TxStats.SpillTime)),
|
||||
fmt.Sprintf("w(%d/%v)", stats.TxStats.Write, truncDuration(stats.TxStats.WriteTime)),
|
||||
)
|
||||
}
|
||||
|
||||
func truncDuration(d time.Duration) string {
|
||||
return regexp.MustCompile(`^(\d+)(\.\d+)`).ReplaceAllString(d.String(), "$1")
|
||||
}
|
||||
|
|
|
@ -137,7 +137,6 @@ func TestParallelTxs(t *testing.T) {
|
|||
// Wait for readers to finish.
|
||||
wg.Wait()
|
||||
})
|
||||
fmt.Fprint(os.Stderr, ".")
|
||||
return true
|
||||
}, qconfig())
|
||||
assert.NoError(t, err)
|
||||
|
|
3
node.go
3
node.go
|
@ -233,6 +233,9 @@ func (n *node) rebalance() {
|
|||
}
|
||||
n.unbalanced = false
|
||||
|
||||
// Update statistics.
|
||||
n.tx.stats.Rebalance++
|
||||
|
||||
// Ignore if node is above threshold (25%) and has enough keys.
|
||||
var threshold = n.tx.db.pageSize / 4
|
||||
if n.size() > threshold && len(n.inodes) > n.minKeys() {
|
||||
|
|
101
tx.go
101
tx.go
|
@ -3,6 +3,7 @@ package bolt
|
|||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
|
@ -36,6 +37,7 @@ type Tx struct {
|
|||
nodes map[pgid]*node
|
||||
pages map[pgid]*page
|
||||
pending []*node
|
||||
stats TxStats
|
||||
}
|
||||
|
||||
// init initializes the transaction.
|
||||
|
@ -75,6 +77,11 @@ func (t *Tx) Writable() bool {
|
|||
return t.writable
|
||||
}
|
||||
|
||||
// Stats retrieves a copy of the current transaction statistics.
|
||||
func (t *Tx) Stats() TxStats {
|
||||
return t.stats
|
||||
}
|
||||
|
||||
// Bucket retrieves a bucket by name.
|
||||
// Returns nil if the bucket does not exist.
|
||||
func (t *Tx) Bucket(name string) *Bucket {
|
||||
|
@ -182,11 +189,17 @@ func (t *Tx) Commit() error {
|
|||
|
||||
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
|
||||
|
||||
// Rebalance and spill data onto dirty pages.
|
||||
// Rebalance nodes which have had deletions.
|
||||
var startTime = time.Now()
|
||||
t.rebalance()
|
||||
t.stats.RebalanceTime += time.Since(startTime)
|
||||
|
||||
// spill data onto dirty pages.
|
||||
startTime = time.Now()
|
||||
if err := t.spill(); err != nil {
|
||||
return err
|
||||
}
|
||||
t.stats.SpillTime += time.Since(startTime)
|
||||
|
||||
// Spill buckets page.
|
||||
p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
|
||||
|
@ -210,6 +223,7 @@ func (t *Tx) Commit() error {
|
|||
t.meta.freelist = p.id
|
||||
|
||||
// Write dirty pages to disk.
|
||||
startTime = time.Now()
|
||||
if err := t.write(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -218,6 +232,7 @@ func (t *Tx) Commit() error {
|
|||
if err := t.writeMeta(); err != nil {
|
||||
return err
|
||||
}
|
||||
t.stats.WriteTime += time.Since(startTime)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -236,6 +251,12 @@ func (t *Tx) Rollback() error {
|
|||
func (t *Tx) close() {
|
||||
if t.writable {
|
||||
t.db.rwlock.Unlock()
|
||||
|
||||
// Merge statistics.
|
||||
t.db.metalock.Lock()
|
||||
t.db.stats.TxStats.add(&t.stats)
|
||||
t.db.metalock.Unlock()
|
||||
|
||||
} else {
|
||||
t.db.removeTx(t)
|
||||
}
|
||||
|
@ -252,6 +273,10 @@ func (t *Tx) allocate(count int) (*page, error) {
|
|||
// Save to our page cache.
|
||||
t.pages[p.id] = p
|
||||
|
||||
// Update statistics.
|
||||
t.stats.PageCount++
|
||||
t.stats.PageAlloc += count * t.db.pageSize
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
|
@ -329,6 +354,9 @@ func (t *Tx) spill() error {
|
|||
if newNode.parent != nil {
|
||||
newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid)
|
||||
}
|
||||
|
||||
// Update the statistics.
|
||||
t.stats.Spill++
|
||||
}
|
||||
|
||||
t.pending = nil
|
||||
|
@ -362,6 +390,9 @@ func (t *Tx) write() error {
|
|||
if _, err := t.db.ops.writeAt(buf, offset); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update statistics.
|
||||
t.stats.Write++
|
||||
}
|
||||
if err := fdatasync(t.db.file); err != nil {
|
||||
return err
|
||||
|
@ -388,6 +419,9 @@ func (t *Tx) writeMeta() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Update statistics.
|
||||
t.stats.Write++
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -408,6 +442,9 @@ func (t *Tx) node(pgid pgid, parent *node) *node {
|
|||
n.read(t.page(pgid))
|
||||
t.nodes[pgid] = n
|
||||
|
||||
// Update statistics.
|
||||
t.stats.NodeCount++
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
|
@ -420,6 +457,9 @@ func (t *Tx) dereference() {
|
|||
for _, n := range t.pending {
|
||||
n.dereference()
|
||||
}
|
||||
|
||||
// Update statistics
|
||||
t.stats.NodeDeref += len(t.nodes) + len(t.pending)
|
||||
}
|
||||
|
||||
// page returns a reference to the page with a given id.
|
||||
|
@ -491,3 +531,62 @@ func (t *Tx) Page(id int) (*PageInfo, error) {
|
|||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// TxStats represents statistics about the actions performed by the transaction.
|
||||
type TxStats struct {
|
||||
// Page statistics.
|
||||
PageCount int // number of page allocations
|
||||
PageAlloc int // total bytes allocated
|
||||
|
||||
// Cursor statistics.
|
||||
CursorCount int // number of cursors created
|
||||
|
||||
// Node statistics
|
||||
NodeCount int // number of node allocations
|
||||
NodeDeref int // number of node dereferences
|
||||
|
||||
// Rebalance statistics.
|
||||
Rebalance int // number of node rebalances
|
||||
RebalanceTime time.Duration // total time spent rebalancing
|
||||
|
||||
// Spill statistics.
|
||||
Spill int // number of node spilled
|
||||
SpillTime time.Duration // total time spent spilling
|
||||
|
||||
// Write statistics.
|
||||
Write int // number of writes performed
|
||||
WriteTime time.Duration // total time spent writing to disk
|
||||
}
|
||||
|
||||
func (s *TxStats) add(other *TxStats) {
|
||||
s.PageCount += other.PageCount
|
||||
s.PageAlloc += other.PageAlloc
|
||||
s.CursorCount += other.CursorCount
|
||||
s.NodeCount += other.NodeCount
|
||||
s.NodeDeref += other.NodeDeref
|
||||
s.Rebalance += other.Rebalance
|
||||
s.RebalanceTime += other.RebalanceTime
|
||||
s.Spill += other.Spill
|
||||
s.SpillTime += other.SpillTime
|
||||
s.Write += other.Write
|
||||
s.WriteTime += other.WriteTime
|
||||
}
|
||||
|
||||
// Sub calculates and returns the difference between two sets of transaction stats.
|
||||
// This is useful when obtaining stats at two different points and time and
|
||||
// you need the performance counters that occurred within that time span.
|
||||
func (s *TxStats) Sub(other *TxStats) TxStats {
|
||||
var diff TxStats
|
||||
diff.PageCount = s.PageCount - other.PageCount
|
||||
diff.PageAlloc = s.PageAlloc - other.PageAlloc
|
||||
diff.CursorCount = s.CursorCount - other.CursorCount
|
||||
diff.NodeCount = s.NodeCount - other.NodeCount
|
||||
diff.NodeDeref = s.NodeDeref - other.NodeDeref
|
||||
diff.Rebalance = s.Rebalance - other.Rebalance
|
||||
diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
|
||||
diff.Spill = s.Spill - other.Spill
|
||||
diff.SpillTime = s.SpillTime - other.SpillTime
|
||||
diff.Write = s.Write - other.Write
|
||||
diff.WriteTime = s.WriteTime - other.WriteTime
|
||||
return diff
|
||||
}
|
||||
|
|
|
@ -436,7 +436,6 @@ func TestTxCursorIterate(t *testing.T) {
|
|||
assert.Equal(t, len(items), index)
|
||||
tx.Rollback()
|
||||
})
|
||||
fmt.Fprint(os.Stderr, ".")
|
||||
return true
|
||||
}
|
||||
if err := quick.Check(f, qconfig()); err != nil {
|
||||
|
@ -477,7 +476,6 @@ func TestTxCursorIterateReverse(t *testing.T) {
|
|||
assert.Equal(t, len(items), index)
|
||||
tx.Rollback()
|
||||
})
|
||||
fmt.Fprint(os.Stderr, ".")
|
||||
return true
|
||||
}
|
||||
if err := quick.Check(f, qconfig()); err != nil {
|
||||
|
|
Loading…
Reference in New Issue