mirror of https://github.com/hak5/bolt.git
Add parallel usage test and race detector.
parent
2b5e440316
commit
7fb06feea4
5
Makefile
5
Makefile
|
@ -14,6 +14,11 @@ fmt:
|
|||
@go fmt ./...
|
||||
|
||||
test: fmt
|
||||
@echo "=== TESTS ==="
|
||||
@go test -v -cover -test.run=$(TEST)
|
||||
@echo ""
|
||||
@echo ""
|
||||
@echo "=== RACE DETECTOR ==="
|
||||
@go test -v -race -test.run=Parallel
|
||||
|
||||
.PHONY: bench cover fmt test
|
||||
|
|
|
@ -12,7 +12,7 @@ type Bucket struct {
|
|||
|
||||
// bucket represents the on-file representation of a bucket.
|
||||
type bucket struct {
|
||||
root pgid
|
||||
root pgid
|
||||
sequence uint64
|
||||
}
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ func (b *buckets) read(p *page) {
|
|||
// Associate keys and items.
|
||||
for index, key := range keys {
|
||||
b.items[key] = &bucket{
|
||||
root: items[index].root,
|
||||
root: items[index].root,
|
||||
sequence: items[index].sequence,
|
||||
}
|
||||
}
|
||||
|
|
6
db.go
6
db.go
|
@ -239,10 +239,6 @@ func (db *DB) Close() {
|
|||
}
|
||||
|
||||
func (db *DB) close() {
|
||||
// Wait for pending transactions before closing and unmapping the data.
|
||||
// db.mmaplock.Lock()
|
||||
// defer db.mmaplock.Unlock()
|
||||
|
||||
// TODO(benbjohnson): Undo everything in Open().
|
||||
db.freelist = nil
|
||||
db.path = ""
|
||||
|
@ -391,7 +387,7 @@ func (db *DB) DeleteBucket(name string) error {
|
|||
// NextSequence returns an autoincrementing integer for the bucket.
|
||||
// This function can return an error if the bucket does not exist.
|
||||
func (db *DB) NextSequence(name string) (int, error) {
|
||||
var seq int
|
||||
var seq int
|
||||
err := db.Do(func(t *RWTransaction) error {
|
||||
var err error
|
||||
seq, err = t.NextSequence(name)
|
||||
|
|
|
@ -259,3 +259,10 @@ func withOpenDB(fn func(*DB, string)) {
|
|||
fn(db, path)
|
||||
})
|
||||
}
|
||||
|
||||
func trunc(b []byte, length int) []byte {
|
||||
if length < len(b) {
|
||||
return b[:length]
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
package bolt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Ensure that multiple threads can use the DB without race detector errors.
|
||||
func TestParallelTransactions(t *testing.T) {
|
||||
var mutex sync.RWMutex
|
||||
|
||||
err := quick.Check(func(numReaders, batchSize uint, items testdata) bool {
|
||||
// Limit the readers & writers to something reasonable.
|
||||
numReaders = (numReaders % 10) + 1
|
||||
batchSize = (batchSize % 50) + 1
|
||||
|
||||
// Maintain the current dataset.
|
||||
var current testdata
|
||||
|
||||
withOpenDB(func(db *DB, path string) {
|
||||
db.CreateBucket("widgets")
|
||||
|
||||
// Maintain a set of concurrent readers.
|
||||
var wg sync.WaitGroup
|
||||
var c = make(chan bool, 0)
|
||||
go func() {
|
||||
var readers = make(chan int, numReaders)
|
||||
for {
|
||||
wg.Add(1)
|
||||
|
||||
// Attempt to start a new reader unless we're stopped.
|
||||
select {
|
||||
case readers <- 0:
|
||||
case <-c:
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
mutex.RLock()
|
||||
local := current
|
||||
txn, err := db.Transaction()
|
||||
mutex.RUnlock()
|
||||
if !assert.NoError(t, err) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// Verify all data is in for local data list.
|
||||
for _, item := range local {
|
||||
value, err := txn.Get("widgets", item.Key)
|
||||
if !assert.NoError(t, err) || !assert.Equal(t, value, item.Value) {
|
||||
txn.Close()
|
||||
wg.Done()
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
txn.Close()
|
||||
wg.Done()
|
||||
<-readers
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
// Batch insert items.
|
||||
pending := items
|
||||
for {
|
||||
// Determine next batch.
|
||||
currentBatchSize := int(batchSize)
|
||||
if currentBatchSize > len(pending) {
|
||||
currentBatchSize = len(pending)
|
||||
}
|
||||
batchItems := pending[0:currentBatchSize]
|
||||
pending = pending[currentBatchSize:]
|
||||
|
||||
// Start write transaction.
|
||||
txn, err := db.RWTransaction()
|
||||
if !assert.NoError(t, err) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// Insert whole batch.
|
||||
for _, item := range batchItems {
|
||||
err := txn.Put("widgets", item.Key, item.Value)
|
||||
if !assert.NoError(t, err) {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
// Commit and update the current list.
|
||||
mutex.Lock()
|
||||
err = txn.Commit()
|
||||
current = append(current, batchItems...)
|
||||
mutex.Unlock()
|
||||
if !assert.NoError(t, err) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// If there are no more left then exit.
|
||||
if len(pending) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Notify readers to stop.
|
||||
close(c)
|
||||
|
||||
// Wait for readers to finish.
|
||||
wg.Wait()
|
||||
})
|
||||
fmt.Fprint(os.Stderr, ".")
|
||||
return true
|
||||
}, qconfig())
|
||||
assert.NoError(t, err)
|
||||
fmt.Fprint(os.Stderr, "\n")
|
||||
}
|
|
@ -20,9 +20,7 @@ func (t *RWTransaction) init(db *DB) {
|
|||
t.Transaction.init(db)
|
||||
t.pages = make(map[pgid]*page)
|
||||
|
||||
// Copy the meta and increase the transaction id.
|
||||
t.meta = &meta{}
|
||||
db.meta().copy(t.meta)
|
||||
// Increment the transaction id.
|
||||
t.meta.txnid += txnid(1)
|
||||
}
|
||||
|
||||
|
|
|
@ -20,9 +20,13 @@ type txnid uint64
|
|||
// init initializes the transaction and associates it with a database.
|
||||
func (t *Transaction) init(db *DB) {
|
||||
t.db = db
|
||||
t.meta = db.meta()
|
||||
t.pages = nil
|
||||
|
||||
// Copy the meta page since it can be changed by the writer.
|
||||
t.meta = &meta{}
|
||||
db.meta().copy(t.meta)
|
||||
|
||||
// Read in the buckets page.
|
||||
t.buckets = &buckets{}
|
||||
t.buckets.read(t.page(t.meta.buckets))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue