From 7fb06feea4012543b6a1b0726a09e9a4c4a84933 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 15 Feb 2014 21:50:34 -0700 Subject: [PATCH] Add parallel usage test and race detector. --- Makefile | 5 ++ bucket.go | 2 +- buckets.go | 2 +- db.go | 6 +-- db_test.go | 7 +++ functional_test.go | 124 +++++++++++++++++++++++++++++++++++++++++++++ rwtransaction.go | 4 +- transaction.go | 6 ++- 8 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 functional_test.go diff --git a/Makefile b/Makefile index a7cd5e7..a6ad051 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,11 @@ fmt: @go fmt ./... test: fmt + @echo "=== TESTS ===" @go test -v -cover -test.run=$(TEST) + @echo "" + @echo "" + @echo "=== RACE DETECTOR ===" + @go test -v -race -test.run=Parallel .PHONY: bench cover fmt test diff --git a/bucket.go b/bucket.go index 9c9ee29..a973f0e 100644 --- a/bucket.go +++ b/bucket.go @@ -12,7 +12,7 @@ type Bucket struct { // bucket represents the on-file representation of a bucket. type bucket struct { - root pgid + root pgid sequence uint64 } diff --git a/buckets.go b/buckets.go index 6d9f27c..3226873 100644 --- a/buckets.go +++ b/buckets.go @@ -64,7 +64,7 @@ func (b *buckets) read(p *page) { // Associate keys and items. for index, key := range keys { b.items[key] = &bucket{ - root: items[index].root, + root: items[index].root, sequence: items[index].sequence, } } diff --git a/db.go b/db.go index 34de9f3..57ffa7b 100644 --- a/db.go +++ b/db.go @@ -239,10 +239,6 @@ func (db *DB) Close() { } func (db *DB) close() { - // Wait for pending transactions before closing and unmapping the data. - // db.mmaplock.Lock() - // defer db.mmaplock.Unlock() - // TODO(benbjohnson): Undo everything in Open(). db.freelist = nil db.path = "" @@ -391,7 +387,7 @@ func (db *DB) DeleteBucket(name string) error { // NextSequence returns an autoincrementing integer for the bucket. // This function can return an error if the bucket does not exist. func (db *DB) NextSequence(name string) (int, error) { - var seq int + var seq int err := db.Do(func(t *RWTransaction) error { var err error seq, err = t.NextSequence(name) diff --git a/db_test.go b/db_test.go index 1a9aa02..666ba4a 100644 --- a/db_test.go +++ b/db_test.go @@ -259,3 +259,10 @@ func withOpenDB(fn func(*DB, string)) { fn(db, path) }) } + +func trunc(b []byte, length int) []byte { + if length < len(b) { + return b[:length] + } + return b +} diff --git a/functional_test.go b/functional_test.go new file mode 100644 index 0000000..e155bc3 --- /dev/null +++ b/functional_test.go @@ -0,0 +1,124 @@ +package bolt + +import ( + "fmt" + "os" + "sync" + "testing" + "testing/quick" + "time" + + "github.com/stretchr/testify/assert" +) + +// Ensure that multiple threads can use the DB without race detector errors. +func TestParallelTransactions(t *testing.T) { + var mutex sync.RWMutex + + err := quick.Check(func(numReaders, batchSize uint, items testdata) bool { + // Limit the readers & writers to something reasonable. + numReaders = (numReaders % 10) + 1 + batchSize = (batchSize % 50) + 1 + + // Maintain the current dataset. + var current testdata + + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + + // Maintain a set of concurrent readers. + var wg sync.WaitGroup + var c = make(chan bool, 0) + go func() { + var readers = make(chan int, numReaders) + for { + wg.Add(1) + + // Attempt to start a new reader unless we're stopped. + select { + case readers <- 0: + case <-c: + wg.Done() + return + } + + go func() { + mutex.RLock() + local := current + txn, err := db.Transaction() + mutex.RUnlock() + if !assert.NoError(t, err) { + t.FailNow() + } + + // Verify all data is in for local data list. + for _, item := range local { + value, err := txn.Get("widgets", item.Key) + if !assert.NoError(t, err) || !assert.Equal(t, value, item.Value) { + txn.Close() + wg.Done() + t.FailNow() + } + } + + txn.Close() + wg.Done() + <-readers + }() + } + }() + + // Batch insert items. + pending := items + for { + // Determine next batch. + currentBatchSize := int(batchSize) + if currentBatchSize > len(pending) { + currentBatchSize = len(pending) + } + batchItems := pending[0:currentBatchSize] + pending = pending[currentBatchSize:] + + // Start write transaction. + txn, err := db.RWTransaction() + if !assert.NoError(t, err) { + t.FailNow() + } + + // Insert whole batch. + for _, item := range batchItems { + err := txn.Put("widgets", item.Key, item.Value) + if !assert.NoError(t, err) { + t.FailNow() + } + } + + // Commit and update the current list. + mutex.Lock() + err = txn.Commit() + current = append(current, batchItems...) + mutex.Unlock() + if !assert.NoError(t, err) { + t.FailNow() + } + + // If there are no more left then exit. + if len(pending) == 0 { + break + } + + time.Sleep(1 * time.Millisecond) + } + + // Notify readers to stop. + close(c) + + // Wait for readers to finish. + wg.Wait() + }) + fmt.Fprint(os.Stderr, ".") + return true + }, qconfig()) + assert.NoError(t, err) + fmt.Fprint(os.Stderr, "\n") +} diff --git a/rwtransaction.go b/rwtransaction.go index 568960a..afb55c8 100644 --- a/rwtransaction.go +++ b/rwtransaction.go @@ -20,9 +20,7 @@ func (t *RWTransaction) init(db *DB) { t.Transaction.init(db) t.pages = make(map[pgid]*page) - // Copy the meta and increase the transaction id. - t.meta = &meta{} - db.meta().copy(t.meta) + // Increment the transaction id. t.meta.txnid += txnid(1) } diff --git a/transaction.go b/transaction.go index 713a019..46adefe 100644 --- a/transaction.go +++ b/transaction.go @@ -20,9 +20,13 @@ type txnid uint64 // init initializes the transaction and associates it with a database. func (t *Transaction) init(db *DB) { t.db = db - t.meta = db.meta() t.pages = nil + // Copy the meta page since it can be changed by the writer. + t.meta = &meta{} + db.meta().copy(t.meta) + + // Read in the buckets page. t.buckets = &buckets{} t.buckets.read(t.page(t.meta.buckets)) }