Add RWTransaction.write().

master
Ben Johnson 2014-01-30 18:22:02 -05:00
parent d087fb4419
commit 26f6fefead
11 changed files with 386 additions and 74 deletions

View File

@ -7,6 +7,8 @@ import (
// branch represents a temporary in-memory branch page.
type branch struct {
pgid pgid
depth int
parent *branch
items branchItems
}
@ -42,11 +44,11 @@ func (b *branch) put(id pgid, newid pgid, key []byte, replace bool) {
}
// read initializes the item data from an on-disk page.
func (b *branch) read(page *page) {
ncount := int(page.count)
b.items = make(branchItems, ncount)
bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&page.ptr))
for i := 0; i < ncount; i++ {
func (b *branch) read(p *page) {
b.pgid = p.id
b.items = make(branchItems, int(p.count))
bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&p.ptr))
for i := 0; i < int(p.count); i++ {
bnode := &bnodes[i]
item := &b.items[i]
item.pgid = bnode.pgid
@ -109,6 +111,12 @@ func (b *branch) split(pageSize int) []*branch {
return branches
}
type branches []*branch
func (s branches) Len() int { return len(s) }
func (s branches) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s branches) Less(i, j int) bool { return s[i].depth < s[j].depth }
type branchItems []branchItem
type branchItem struct {

View File

@ -1,5 +1,7 @@
package bolt
const MaxBucketNameSize = 255
type Bucket struct {
*bucket
name string

14
db.go
View File

@ -118,7 +118,9 @@ func (db *DB) mmap() error {
} else if int(info.Size()) < db.pageSize*2 {
return &Error{"file size too small", err}
}
size := int(info.Size())
// TEMP(benbjohnson): Set max size to 1MB.
size := 2 << 20
// Memory-map the data file as a byte slice.
if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil {
@ -159,7 +161,9 @@ func (db *DB) init() error {
m.pageSize = uint32(db.pageSize)
m.version = Version
m.free = 2
m.sys.root = 3
m.sys = 3
m.pgid = 4
m.txnid = txnid(i)
}
// Write an empty freelist at page 3.
@ -171,7 +175,7 @@ func (db *DB) init() error {
// Write an empty leaf page at page 4.
p = db.pageInBuffer(buf[:], pgid(3))
p.id = pgid(3)
p.flags = p_leaf
p.flags = p_sys
p.count = 0
// Write the buffer to our data file.
@ -206,7 +210,7 @@ func (db *DB) Transaction() (*Transaction, error) {
// Create a transaction associated with the database.
t := &Transaction{}
t.init(db, db.meta())
t.init(db)
return t, nil
}
@ -230,7 +234,7 @@ func (db *DB) RWTransaction() (*RWTransaction, error) {
branches: make(map[pgid]*branch),
leafs: make(map[pgid]*leaf),
}
t.init(db, db.meta())
t.init(db)
return t, nil
}

View File

@ -8,6 +8,7 @@ import (
// leaf represents an in-memory, deserialized leaf page.
type leaf struct {
pgid pgid
parent *branch
items leafItems
}
@ -39,10 +40,10 @@ func (l *leaf) put(key []byte, value []byte) {
// read initializes the item data from an on-disk page.
func (l *leaf) read(p *page) {
ncount := int(p.count)
l.items = make(leafItems, ncount)
l.pgid = p.id
l.items = make(leafItems, int(p.count))
lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&p.ptr))
for i := 0; i < ncount; i++ {
for i := 0; i < int(p.count); i++ {
lnode := &lnodes[i]
item := &l.items[i]
item.key = lnode.key()

14
meta.go
View File

@ -14,8 +14,8 @@ type meta struct {
pageSize uint32
pgid pgid
free pgid
sys pgid
txnid txnid
sys bucket
}
// validate checks the marker bytes and version of the meta page to ensure it matches this binary.
@ -30,8 +30,20 @@ func (m *meta) validate() error {
// copy copies one meta object to another.
func (m *meta) copy(dest *meta) {
dest.magic = m.magic
dest.version = m.version
dest.pageSize = m.pageSize
dest.pgid = m.pgid
dest.free = m.free
dest.txnid = m.txnid
dest.sys = m.sys
}
// write writes the meta onto a page.
func (m *meta) write(p *page) {
// Page id is either going to be 0 or 1 which we can determine by the Txn ID.
p.id = pgid(m.txnid % 2)
p.flags |= p_meta
m.copy(p.meta())
}

View File

@ -14,7 +14,8 @@ const (
p_branch = 0x01
p_leaf = 0x02
p_meta = 0x04
p_freelist = 0x08
p_sys = 0x08
p_freelist = 0x10
)
type pgid uint64
@ -56,3 +57,9 @@ func (p *page) bnodes() []bnode {
func (p *page) freelist() []pgid {
return ((*[maxNodesPerPage]pgid)(unsafe.Pointer(&p.ptr)))[0:p.count]
}
type pages []*page
func (s pages) Len() int { return len(s) }
func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s pages) Less(i, j int) bool { return s[i].id < s[j].id }

View File

@ -1,6 +1,7 @@
package bolt
import (
"sort"
"unsafe"
)
@ -12,31 +13,43 @@ type RWTransaction struct {
leafs map[pgid]*leaf
}
// init initializes the transaction.
func (t *RWTransaction) init(db *DB) {
t.Transaction.init(db)
t.pages = make(map[pgid]*page)
// Copy the meta and increase the transaction id.
t.meta = &meta{}
db.meta().copy(t.meta)
t.meta.txnid += txnid(2)
}
// CreateBucket creates a new bucket.
func (t *RWTransaction) CreateBucket(name string) error {
// Check if bucket already exists.
if b := t.Bucket(name); b != nil {
return &Error{"bucket already exists", nil}
} else if len(name) == 0 {
return &Error{"bucket name cannot be blank", nil}
} else if len(name) > MaxBucketNameSize {
return &Error{"bucket name too large", nil}
}
// Create a new bucket entry.
var buf [unsafe.Sizeof(bucket{})]byte
var raw = (*bucket)(unsafe.Pointer(&buf[0]))
raw.root = 0
// Create a blank root leaf page.
p := t.allocate(1)
p.flags = p_leaf
// Move cursor to insertion location.
c := t.sys.Cursor()
c.Goto([]byte(name))
// Load the leaf node from the cursor and insert the key/value.
t.leaf(c).put([]byte(name), buf[:])
// Add bucket to system page.
t.sys.put(name, &bucket{root: p.id})
return nil
}
// DropBucket deletes a bucket.
func (t *RWTransaction) DeleteBucket(b *Bucket) error {
// TODO: Remove from main DB.
func (t *RWTransaction) DeleteBucket(name string) error {
// Remove from system page.
t.sys.del(name)
// TODO: Delete entry from system bucket.
// TODO: Free all pages.
// TODO: Remove cursor.
@ -74,13 +87,31 @@ func (t *RWTransaction) Delete(key []byte) error {
return nil
}
// Commit writes all changes to disk.
func (t *RWTransaction) Commit() error {
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
// TODO: Flush data.
// TODO: Rebalance.
// TODO: Update meta.
// TODO: Write meta.
// Spill data onto dirty pages.
t.spill()
// Spill system page.
p := t.allocate((t.sys.size() / t.db.pageSize) + 1)
t.sys.write(p)
// Write dirty pages to disk.
if err := t.write(); err != nil {
return err
}
// Update the meta.
t.meta.sys = p.id
// Write meta to disk.
if err := t.writeMeta(); err != nil {
return err
}
return nil
}
@ -99,10 +130,107 @@ func (t *RWTransaction) close() error {
}
// allocate returns a contiguous block of memory starting at a given page.
func (t *RWTransaction) allocate(size int) (*page, error) {
// TODO: Find a continuous block of free pages.
// TODO: If no free pages are available, resize the mmap to allocate more.
return nil, nil
func (t *RWTransaction) allocate(count int) *page {
// TODO(benbjohnson): Use pages from the freelist.
// Allocate a set of contiguous pages from the end of the file.
buf := make([]byte, count*t.db.pageSize)
p := (*page)(unsafe.Pointer(&buf[0]))
p.id = t.meta.pgid
p.overflow = uint32(count - 1)
// Increment the last page id.
t.meta.pgid += pgid(count)
// Save it in our page cache.
t.pages[p.id] = p
return p
}
// spill writes all the leafs and branches to dirty pages.
func (t *RWTransaction) spill() {
// Spill leafs first.
for _, l := range t.leafs {
t.spillLeaf(l)
}
// Sort branches by highest depth first.
branches := make(branches, 0, len(t.branches))
for _, b := range t.branches {
branches = append(branches, b)
}
sort.Sort(branches)
// Spill branches by deepest first.
for _, b := range branches {
t.spillBranch(b)
}
}
// spillLeaf writes a leaf to one or more dirty pages.
func (t *RWTransaction) spillLeaf(l *leaf) {
parent := l.parent
// Split leaf, if necessary.
leafs := l.split(t.db.pageSize)
// TODO: If this is a root leaf and we split then add a parent branch.
// Process each resulting leaf.
previd := leafs[0].pgid
for index, l := range leafs {
// Allocate contiguous space for the leaf.
p := t.allocate((l.size() / t.db.pageSize) + 1)
// Write the leaf to the page.
l.write(p)
// Insert or replace the node in the parent branch with the new pgid.
if parent != nil {
parent.put(previd, p.id, l.items[0].key, (index == 0))
previd = l.pgid
}
}
}
// spillBranch writes a branch to one or more dirty pages.
func (t *RWTransaction) spillBranch(l *branch) {
warn("[pending] RWTransaction.spillBranch()") // TODO
}
// write writes any dirty pages to disk.
func (t *RWTransaction) write() error {
// TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize.
// Sort pages by id.
pages := make(pages, 0, len(t.pages))
for _, p := range t.pages {
pages = append(pages, p)
}
sort.Sort(pages)
// Write pages to disk in order.
for _, p := range pages {
size := (int(p.overflow) + 1) * t.db.pageSize
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
t.db.file.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
}
return nil
}
// writeMeta writes the meta to the disk.
func (t *RWTransaction) writeMeta() error {
// Create a temporary buffer for the meta page.
buf := make([]byte, t.db.pageSize)
p := t.db.pageInBuffer(buf, 0)
t.meta.write(p)
// Write the meta page to file.
t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
return nil
}
// leaf retrieves a leaf object based on the current position of a cursor.
@ -141,6 +269,7 @@ func (t *RWTransaction) branch(stack []elem) *branch {
// Otherwise create a branch and cache it.
b := &branch{}
b.read(t.page(id))
b.depth = len(stack) - 1
b.parent = t.branch(stack[:len(stack)-1])
t.branches[id] = b

View File

@ -27,18 +27,16 @@ func TestTransactionCreateBucket(t *testing.T) {
err = txn.Commit()
assert.NoError(t, err)
/*
// Open a separate read-only transaction.
rtxn, err := db.Transaction()
assert.NotNil(t, txn)
assert.NoError(t, err)
// Open a separate read-only transaction.
rtxn, err := db.Transaction()
assert.NotNil(t, txn)
assert.NoError(t, err)
b, err := rtxn.Bucket("widgets")
assert.NoError(t, err)
if assert.NotNil(t, b) {
assert.Equal(t, b.Name(), "widgets")
}
*/
b := rtxn.Bucket("widgets")
assert.NoError(t, err)
if assert.NotNil(t, b) {
assert.Equal(t, b.Name(), "widgets")
}
})
}

95
sys.go Normal file
View File

@ -0,0 +1,95 @@
package bolt
import (
"sort"
"unsafe"
)
// sys represents a in-memory system page.
type sys struct {
pgid pgid
buckets map[string]*bucket
}
// size returns the size of the page after serialization.
func (s *sys) size() int {
var size int = pageHeaderSize
for key, _ := range s.buckets {
size += int(unsafe.Sizeof(bucket{})) + len(key)
}
return size
}
// get retrieves a bucket by name.
func (s *sys) get(key string) *bucket {
return s.buckets[key]
}
// put sets a new value for a bucket.
func (s *sys) put(key string, b *bucket) {
s.buckets[key] = b
}
// del deletes a bucket by name.
func (s *sys) del(key string) {
delete(s.buckets, key)
}
// read initializes the data from an on-disk page.
func (s *sys) read(p *page) {
s.pgid = p.id
s.buckets = make(map[string]*bucket)
var buckets []*bucket
var keys []string
// Read buckets.
nodes := (*[maxNodesPerPage]bucket)(unsafe.Pointer(&p.ptr))
for i := 0; i < int(p.count); i++ {
node := &nodes[i]
buckets = append(buckets, node)
}
// Read keys.
buf := (*[maxAllocSize]byte)(unsafe.Pointer(&nodes[p.count]))[:]
for i := 0; i < int(p.count); i++ {
size := int(buf[0])
buf = buf[1:]
keys = append(keys, string(buf[:size]))
buf = buf[size:]
}
// Associate keys and buckets.
for index, key := range keys {
s.buckets[key] = buckets[index]
}
}
// write writes the items onto a page.
func (s *sys) write(p *page) {
// Initialize page.
p.flags |= p_sys
p.count = uint16(len(s.buckets))
// Sort keys.
var keys []string
for key, _ := range s.buckets {
keys = append(keys, key)
}
sort.StringSlice(keys).Sort()
// Write each bucket to the page.
buckets := (*[maxNodesPerPage]bucket)(unsafe.Pointer(&p.ptr))
for index, key := range keys {
buckets[index] = *s.buckets[key]
}
// Write each key to the page.
buf := (*[maxAllocSize]byte)(unsafe.Pointer(&buckets[p.count]))[:]
for _, key := range keys {
buf[0] = byte(len(key))
buf = buf[1:]
copy(buf, []byte(key))
buf = buf[len(key):]
}
}

70
sys_test.go Normal file
View File

@ -0,0 +1,70 @@
package bolt
import (
"testing"
"unsafe"
"github.com/stretchr/testify/assert"
)
// Ensure that a system page can set a bucket.
func TestSysPut(t *testing.T) {
s := &sys{buckets: make(map[string]*bucket)}
s.put("foo", &bucket{root: 2})
s.put("bar", &bucket{root: 3})
s.put("foo", &bucket{root: 4})
assert.Equal(t, len(s.buckets), 2)
assert.Equal(t, s.get("foo").root, pgid(4))
assert.Equal(t, s.get("bar").root, pgid(3))
assert.Nil(t, s.get("no_such_bucket"))
}
// Ensure that a system page can deserialize from a page.
func TestSysRead(t *testing.T) {
// Create a page.
var buf [4096]byte
page := (*page)(unsafe.Pointer(&buf[0]))
page.count = 2
// Insert 2 buckets at the beginning.
buckets := (*[3]bucket)(unsafe.Pointer(&page.ptr))
buckets[0] = bucket{root: 3}
buckets[1] = bucket{root: 4}
// Write data for the nodes at the end.
data := (*[4096]byte)(unsafe.Pointer(&buckets[2]))
data[0] = 3
copy(data[1:], []byte("bar"))
data[4] = 10
copy(data[5:], []byte("helloworld"))
// Deserialize page into a system page.
s := &sys{buckets: make(map[string]*bucket)}
s.read(page)
// Check that there are two items with correct data.
assert.Equal(t, len(s.buckets), 2)
assert.Equal(t, s.get("bar").root, pgid(3))
assert.Equal(t, s.get("helloworld").root, pgid(4))
}
// Ensure that a system page can serialize itself.
func TestSysWrite(t *testing.T) {
s := &sys{buckets: make(map[string]*bucket)}
s.put("foo", &bucket{root: 2})
s.put("bar", &bucket{root: 3})
// Write it to a page.
var buf [4096]byte
p := (*page)(unsafe.Pointer(&buf[0]))
s.write(p)
// Read the page back in.
s2 := &sys{buckets: make(map[string]*bucket)}
s2.read(p)
// Check that the two pages are the same.
assert.Equal(t, len(s.buckets), 2)
assert.Equal(t, s.get("foo").root, pgid(2))
assert.Equal(t, s.get("bar").root, pgid(3))
}

View File

@ -1,9 +1,5 @@
package bolt
import (
"unsafe"
)
var (
InvalidTransactionError = &Error{"txn is invalid", nil}
BucketAlreadyExistsError = &Error{"bucket already exists", nil}
@ -19,22 +15,21 @@ const (
type txnid uint64
type Transaction struct {
id int
db *DB
meta *meta
sys Bucket
buckets map[string]*Bucket
pages map[pgid]*page
id int
db *DB
meta *meta
sys *sys
pages map[pgid]*page
}
// init initializes the transaction and associates it with a database.
func (t *Transaction) init(db *DB, meta *meta) {
func (t *Transaction) init(db *DB) {
t.db = db
t.meta = meta
t.buckets = make(map[string]*Bucket)
t.meta = db.meta()
t.pages = nil
t.sys.transaction = t
t.sys.bucket = &t.meta.sys
t.sys = &sys{}
t.sys.read(t.page(t.meta.sys))
}
func (t *Transaction) Close() error {
@ -48,26 +43,17 @@ func (t *Transaction) DB() *DB {
// Bucket retrieves a bucket by name.
func (t *Transaction) Bucket(name string) *Bucket {
// Return cached reference if it's already been looked up.
if b := t.buckets[name]; b != nil {
return b
}
// Retrieve bucket data from the system bucket.
value := t.sys.Cursor().Get([]byte(name))
if value == nil {
// Lookup bucket from the system page.
b := t.sys.get(name)
if b == nil {
return nil
}
// Create a bucket that overlays the data.
b := &Bucket{
bucket: (*bucket)(unsafe.Pointer(&value[0])),
return &Bucket{
bucket: b,
name: name,
transaction: t,
}
t.buckets[name] = b
return b
}
// Cursor creates a cursor associated with a given bucket.