bolt/transaction.go

404 lines
9.3 KiB
Go

package bolt
import (
"sort"
"unsafe"
)
// txnid represents the internal transaction identifier.
type txnid uint64
// Transaction represents a consistent view into the database.
// Read-only transactions can be created by calling DB.Transaction().
// Read-write transactions can be created by calling DB.RWTransaction().
// Only one read-write transaction is allowed at a time.
type Transaction struct {
db *DB
meta *meta
buckets *buckets
writable bool
pages map[pgid]*page
nodes map[pgid]*node
pending []*node
}
// init initializes the transaction.
func (t *Transaction) init(db *DB) {
t.db = db
t.pages = nil
// Copy the meta page since it can be changed by the writer.
t.meta = &meta{}
db.meta().copy(t.meta)
// Read in the buckets page.
t.buckets = &buckets{}
t.buckets.read(t.page(t.meta.buckets))
t.pages = make(map[pgid]*page)
// Increment the transaction id.
t.meta.txnid += txnid(1)
}
// id returns the transaction id.
func (t *Transaction) id() txnid {
return t.meta.txnid
}
// DB returns a reference to the database that created the transaction.
func (t *Transaction) DB() *DB {
return t.db
}
// Writable returns whether the transaction can change data.
func (t *Transaction) Writable() bool {
return t.writable
}
// Bucket retrieves a bucket by name.
// Returns nil if the bucket does not exist.
func (t *Transaction) Bucket(name string) *Bucket {
_assert(t.isOpen(), "transaction not open")
b := t.buckets.get(name)
if b == nil {
return nil
}
return &Bucket{
bucket: b,
name: name,
transaction: t,
}
}
// Buckets retrieves a list of all buckets.
func (t *Transaction) Buckets() []*Bucket {
_assert(t.isOpen(), "transaction not open")
buckets := make([]*Bucket, 0, len(t.buckets.items))
for name, b := range t.buckets.items {
bucket := &Bucket{bucket: b, transaction: t, name: name}
buckets = append(buckets, bucket)
}
return buckets
}
// CreateBucket creates a new bucket.
// Returns an error if the transaction is read-only, if bucket already exists,
// if the bucket name is blank, or if the bucket name is too long.
func (t *Transaction) CreateBucket(name string) error {
_assert(t.isOpen(), "transaction not open")
if !t.writable {
return ErrTransactionNotWritable
} else if b := t.Bucket(name); b != nil {
return ErrBucketExists
} else if len(name) == 0 {
return ErrBucketNameRequired
} else if len(name) > MaxBucketNameSize {
return ErrBucketNameTooLarge
}
// Create a blank root leaf page.
p, err := t.allocate(1)
if err != nil {
return err
}
p.flags = leafPageFlag
// Add bucket to buckets page.
t.buckets.put(name, &bucket{root: p.id})
return nil
}
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
// Returns an error if the transaction is read-only, if the bucket name is
// blank, or if the bucket name is too long.
func (t *Transaction) CreateBucketIfNotExists(name string) error {
_assert(t.isOpen(), "transaction not open")
err := t.CreateBucket(name)
if err != nil && err != ErrBucketExists {
return err
}
return nil
}
// DeleteBucket deletes a bucket.
// Returns an error if the transaction is read-only or if the bucket cannot be found.
func (t *Transaction) DeleteBucket(name string) error {
_assert(t.isOpen(), "transaction not open")
if !t.writable {
return ErrTransactionNotWritable
} else if b := t.Bucket(name); b == nil {
return ErrBucketNotFound
}
// Remove from buckets page.
t.buckets.del(name)
// TODO(benbjohnson): Free all pages.
return nil
}
// Commit writes all changes to disk and updates the meta page.
// Read-only transactions will simply be closed.
// Returns an error if a disk write error occurs.
func (t *Transaction) Commit() error {
defer t.close()
// Ignore commit for read-only transactions.
if !t.writable {
return nil
}
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
// Rebalance and spill data onto dirty pages.
t.rebalance()
t.spill()
// Spill buckets page.
p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
if err != nil {
return err
}
t.buckets.write(p)
// Write dirty pages to disk.
if err := t.write(); err != nil {
return err
}
// Update the meta.
t.meta.buckets = p.id
// Write meta to disk.
if err := t.writeMeta(); err != nil {
return err
}
return nil
}
// Rollback closes the transaction and rolls back any pending changes.
func (t *Transaction) Rollback() {
t.close()
}
func (t *Transaction) close() {
if t.writable {
t.db.rwlock.Unlock()
} else {
t.db.removeTransaction(t)
}
// Detach from the database.
t.db = nil
}
// isOpen returns whether the transaction is currently open.
func (t *Transaction) isOpen() bool {
return t.db != nil
}
// allocate returns a contiguous block of memory starting at a given page.
func (t *Transaction) allocate(count int) (*page, error) {
p, err := t.db.allocate(count)
if err != nil {
return nil, err
}
// Save to our page cache.
t.pages[p.id] = p
return p, nil
}
// rebalance attempts to balance all nodes.
func (t *Transaction) rebalance() {
for _, n := range t.nodes {
n.rebalance()
}
}
// spill writes all the nodes to dirty pages.
func (t *Transaction) spill() error {
// Keep track of the current root nodes.
// We will update this at the end once all nodes are created.
type root struct {
node *node
pgid pgid
}
var roots []root
// Sort nodes by highest depth first.
nodes := make(nodesByDepth, 0, len(t.nodes))
for _, n := range t.nodes {
nodes = append(nodes, n)
}
sort.Sort(nodes)
// Spill nodes by deepest first.
for i := 0; i < len(nodes); i++ {
n := nodes[i]
// Save existing root buckets for later.
if n.parent == nil && n.pgid != 0 {
roots = append(roots, root{n, n.pgid})
}
// Split nodes into appropriate sized nodes.
// The first node in this list will be a reference to n to preserve ancestry.
newNodes := n.split(t.db.pageSize)
t.pending = newNodes
// If this is a root node that split then create a parent node.
if n.parent == nil && len(newNodes) > 1 {
n.parent = &node{transaction: t, isLeaf: false}
nodes = append(nodes, n.parent)
}
// Add node's page to the freelist.
if n.pgid > 0 {
t.db.freelist.free(t.id(), t.page(n.pgid))
}
// Write nodes to dirty pages.
for i, newNode := range newNodes {
// Allocate contiguous space for the node.
p, err := t.allocate((newNode.size() / t.db.pageSize) + 1)
if err != nil {
return err
}
// Write the node to the page.
newNode.write(p)
newNode.pgid = p.id
newNode.parent = n.parent
// The first node should use the existing entry, other nodes are inserts.
var oldKey []byte
if i == 0 {
oldKey = n.key
} else {
oldKey = newNode.inodes[0].key
}
// Update the parent entry.
if newNode.parent != nil {
newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid)
}
}
t.pending = nil
}
// Update roots with new roots.
for _, root := range roots {
t.buckets.updateRoot(root.pgid, root.node.root().pgid)
}
// Clear out nodes now that they are all spilled.
t.nodes = make(map[pgid]*node)
return nil
}
// write writes any dirty pages to disk.
func (t *Transaction) write() error {
// Sort pages by id.
pages := make(pages, 0, len(t.pages))
for _, p := range t.pages {
pages = append(pages, p)
}
sort.Sort(pages)
// Write pages to disk in order.
for _, p := range pages {
size := (int(p.overflow) + 1) * t.db.pageSize
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
offset := int64(p.id) * int64(t.db.pageSize)
if _, err := t.db.file.WriteAt(buf, offset); err != nil {
return err
}
}
// Clear out page cache.
t.pages = make(map[pgid]*page)
return nil
}
// writeMeta writes the meta to the disk.
func (t *Transaction) writeMeta() error {
// Create a temporary buffer for the meta page.
buf := make([]byte, t.db.pageSize)
p := t.db.pageInBuffer(buf, 0)
t.meta.write(p)
// Write the meta page to file.
t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
return nil
}
// page returns a reference to the page with a given id.
// If page has been written to then a temporary bufferred page is returned.
func (t *Transaction) page(id pgid) *page {
// Check the dirty pages first.
if t.pages != nil {
if p, ok := t.pages[id]; ok {
return p
}
}
// Otherwise return directly from the mmap.
return t.db.page(id)
}
// node creates a node from a page and associates it with a given parent.
func (t *Transaction) node(pgid pgid, parent *node) *node {
// Retrieve node if it has already been fetched.
if n := t.nodes[pgid]; n != nil {
return n
}
// Otherwise create a branch and cache it.
n := &node{transaction: t, parent: parent}
if n.parent != nil {
n.depth = n.parent.depth + 1
}
n.read(t.page(pgid))
t.nodes[pgid] = n
return n
}
// dereference removes all references to the old mmap.
func (t *Transaction) dereference() {
for _, n := range t.nodes {
n.dereference()
}
for _, n := range t.pending {
n.dereference()
}
}
// forEachPage iterates over every page within a given page and executes a function.
func (t *Transaction) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
p := t.page(pgid)
// Execute function.
fn(p, depth)
// Recursively loop over children.
if (p.flags & branchPageFlag) != 0 {
for i := 0; i < int(p.count); i++ {
elem := p.branchPageElement(uint16(i))
t.forEachPage(elem.pgid, depth+1, fn)
}
}
}