bolt/functional_test.go

135 lines
2.7 KiB
Go

package bolt
import (
"fmt"
"os"
"sync"
"testing"
"testing/quick"
"time"
"github.com/stretchr/testify/assert"
)
// Ensure that multiple threads can use the DB without race detector errors.
func TestParallelTxs(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
var mutex sync.RWMutex
err := quick.Check(func(numReaders, batchSize uint, items testdata) bool {
// Limit the readers & writers to something reasonable.
numReaders = (numReaders % 10) + 1
batchSize = (batchSize % 50) + 1
// Maintain the current dataset.
var current testdata
withOpenDB(func(db *DB, path string) {
db.Do(func(tx *Tx) error {
return tx.CreateBucket("widgets")
})
// Maintain a set of concurrent readers.
var wg sync.WaitGroup
var c = make(chan bool, 0)
go func() {
var readers = make(chan int, numReaders)
for {
wg.Add(1)
// Attempt to start a new reader unless we're stopped.
select {
case readers <- 0:
case <-c:
wg.Done()
return
}
go func() {
mutex.RLock()
local := current
tx, err := db.Tx()
mutex.RUnlock()
if err == ErrDatabaseNotOpen {
wg.Done()
return
} else if !assert.NoError(t, err) {
t.FailNow()
}
// Verify all data is in for local data list.
for _, item := range local {
value := tx.Bucket("widgets").Get(item.Key)
if !assert.NoError(t, err) || !assert.Equal(t, value, item.Value) {
tx.Rollback()
wg.Done()
t.FailNow()
}
}
tx.Rollback()
wg.Done()
<-readers
}()
}
}()
// Batch insert items.
pending := items
for {
// Determine next batch.
currentBatchSize := int(batchSize)
if currentBatchSize > len(pending) {
currentBatchSize = len(pending)
}
batchItems := pending[0:currentBatchSize]
pending = pending[currentBatchSize:]
// Start write transaction.
tx, err := db.RWTx()
if !assert.NoError(t, err) {
t.FailNow()
}
// Insert whole batch.
b := tx.Bucket("widgets")
for _, item := range batchItems {
err := b.Put(item.Key, item.Value)
if !assert.NoError(t, err) {
t.FailNow()
}
}
// Commit and update the current list.
mutex.Lock()
err = tx.Commit()
current = append(current, batchItems...)
mutex.Unlock()
if !assert.NoError(t, err) {
t.FailNow()
}
// If there are no more left then exit.
if len(pending) == 0 {
break
}
time.Sleep(1 * time.Millisecond)
}
// Notify readers to stop.
close(c)
// Wait for readers to finish.
wg.Wait()
})
fmt.Fprint(os.Stderr, ".")
return true
}, qconfig())
assert.NoError(t, err)
fmt.Fprint(os.Stderr, "\n")
}