Intermediate.

master
Ben Johnson 2014-01-27 10:11:54 -05:00
parent 1baa6d576a
commit 192649f453
12 changed files with 246 additions and 327 deletions

View File

@ -4,20 +4,16 @@ import (
"unsafe"
)
const bnodeSize = int(unsafe.Sizeof(lnode{}))
// bnode represents a node on a branch page.
type bnode struct {
flags uint16
keySize uint16
pgid pgid
data uintptr // Pointer to the beginning of the data.
pos uint32
ksize uint32
pgid pgid
}
// key returns a byte slice that of the key data.
// key returns a byte slice of the node key.
func (n *bnode) key() []byte {
return (*[MaxKeySize]byte)(unsafe.Pointer(&n.data))[:n.keySize]
}
// bnodeSize returns the number of bytes required to store a key as a branch node.
func bnodeSize(key []byte) int {
return int(unsafe.Offsetof((*bnode)(nil)).data) + len(key)
return (*[MaxKeySize]byte)(unsafe.Pointer(&n))[n.pos : n.pos+n.ksize]
}

4
bnodes.go Normal file
View File

@ -0,0 +1,4 @@
package bolt
type bnodes []bnode

View File

@ -1,37 +1,40 @@
package bolt
var (
InvalidBucketError = &Error{"invalid bucket", nil}
)
type bucketid uint32
type Bucket struct {
*bucket
name string
transaction Transaction,
cursors []*Cursor,
name string
transaction *Transaction
cursors []*Cursor
}
type bucket struct {
id bucketid
flags uint32
root pgid
branches pgid
leafs pgid
entries uint64
id bucketid
flags uint32
root pgid
}
func (b *Bucket) Close() error {
// TODO: Close cursors.
return nil
// Get retrieves the value for a key in the bucket.
func (b *Bucket) Get(key []byte) []byte {
return b.cursor().Get(key)
}
func (b *Bucket) Cursor() (*Cursor, error) {
if b.transaction == nil {
return nil, InvalidBucketError
}
// Cursor creates a new cursor for this bucket.
func (b *Bucket) Cursor() *Cursor {
c := b.cursor()
b.cursors = append(b.cursors, c)
return c
}
c := &Cursor{
// cursor creates a new untracked cursor for this bucket.
func (b *Bucket) cursor() *Cursor {
return &Cursor{
bucket: b,
stack: make([]elem, 0),
stack: make([]elem, 0),
}
return nil
}

View File

@ -1,8 +1,8 @@
package bolt
type Cursor struct {
bucket *Bucket
stack []elem
bucket *Bucket
stack []elem
}
// elem represents a node on a page that's on the cursor's stack.
@ -16,22 +16,43 @@ func (c *Cursor) Bucket() *Bucket {
}
// First moves the cursor to the first item in the bucket and returns its key and data.
func (c *Cursor) First() ([]byte, []byte, error) {
func (c *Cursor) First() ([]byte, []byte) {
// TODO: Traverse to the first key.
return nil, nil, nil
return nil, nil
}
// Move the cursor to the next key/value.
func (c *Cursor) Next() ([]byte, []byte, error) {
return nil, nil, nil
func (c *Cursor) Next() ([]byte, []byte) {
return nil, nil
}
// Get positions the cursor at a specific key and returns the its value.
func (c *Cursor) Get(key []byte) []byte {
if c.Goto(key) {
return c.node().value()
}
return nil
}
// Goto positions the cursor at a specific key.
func (c *Cursor) Goto(key []byte) ([]byte, error) {
// Returns true if an exact match or false if positioned after the closest match.
func (c *Cursor) Goto(key []byte) bool {
// TODO(benbjohnson): Optimize for specific use cases.
// TODO: Check if len(key) > 0.
// TODO: Start from root page and traverse to correct page.
return nil, nil
return false
}
// current the page and leaf node that the cursor is currently pointing at.
func (c *Cursor) current() (*page, *lnode) {
elem := c.stack[len(c.stack)-1]
return elem.page, elem.page.lnode(elem.index)
}
// node returns the leaf node that the cursor is currently positioned on.
func (c *Cursor) node() *lnode {
elem := c.stack[len(c.stack)-1]
return elem.page.lnode(elem.index)
}

48
db.go
View File

@ -12,6 +12,8 @@ const (
db_nometasync
)
const minPageSize = 0x1000
var (
DatabaseNotOpenError = &Error{"db is not open", nil}
DatabaseAlreadyOpenedError = &Error{"db already open", nil}
@ -28,23 +30,11 @@ type DB struct {
file file
metafile file
data []byte
buf []byte
meta0 *meta
meta1 *meta
pageSize int
rwtransaction *RWTransaction
transactions []*Transaction
maxPageNumber int /**< me_mapsize / me_psize */
freePages []int /** IDL of pages that became unused in a write txn */
dirtyPages []int /** ID2L of pages written during a write txn. Length MDB_IDL_UM_SIZE. */
// TODO: scratch []*page // list of temp pages for writing.
readers []*reader
maxFreeOnePage int /** Max number of freelist items that can fit in a single overflow page */
maxPageDataSize int
maxNodeSize int /** Max size of a node on a page */
maxKeySize int /**< max size of a key */
}
// NewDB creates a new DB instance.
@ -91,10 +81,10 @@ func (db *DB) Open(path string, mode os.FileMode) error {
// Read enough data to get both meta pages.
var m, m0, m1 *meta
var buf [pageHeaderSize + int(unsafe.Sizeof(meta{}))]byte
var buf [minPageSize]byte
if _, err := db.file.ReadAt(buf[:], 0); err == nil {
if m0, _ = db.pageInBuffer(buf[:], 0).meta(); m0 != nil {
db.pageSize = int(m0.free.pad)
db.pageSize = int(m0.pageSize)
}
}
if _, err := db.file.ReadAt(buf[:], int64(db.pageSize)); err == nil {
@ -115,12 +105,6 @@ func (db *DB) Open(path string, mode os.FileMode) error {
}
}
// Initialize db fields.
db.buf = make([]byte, db.pageSize)
db.maxPageDataSize = ((db.pageSize - pageHeaderSize) / int(unsafe.Sizeof(pgno(0)))) - 1
db.maxNodeSize = (((db.pageSize - pageHeaderSize) / minKeyCount) & -2) - int(unsafe.Sizeof(indx(0)))
// TODO?: env->me_maxpg = env->me_mapsize / env->me_psize;
// Memory map the data file.
if err := db.mmap(); err != nil {
db.close()
@ -181,8 +165,8 @@ func (db *DB) init() error {
// Create two meta pages on a buffer.
buf := make([]byte, db.pageSize*2)
for i := 0; i < 2; i++ {
p := db.pageInBuffer(buf[:], i)
p.id = pgno(i)
p := db.pageInBuffer(buf[:], pgid(i))
p.id = pgid(i)
p.init(db.pageSize)
}
@ -198,7 +182,7 @@ func (db *DB) init() error {
func (db *DB) Close() {
db.Lock()
defer db.Unlock()
s.close()
db.close()
}
func (db *DB) close() {
@ -245,13 +229,13 @@ func (db *DB) RWTransaction() (*RWTransaction, error) {
}
// page retrieves a page reference from the mmap based on the current page size.
func (db *DB) page(id int) *page {
return (*page)(unsafe.Pointer(&db.data[id*db.pageSize]))
func (db *DB) page(id pgid) *page {
return (*page)(unsafe.Pointer(&db.data[id*pgid(db.pageSize)]))
}
// pageInBuffer retrieves a page reference from a given byte array based on the current page size.
func (db *DB) pageInBuffer(b []byte, id int) *page {
return (*page)(unsafe.Pointer(&b[id*db.pageSize]))
func (db *DB) pageInBuffer(b []byte, id pgid) *page {
return (*page)(unsafe.Pointer(&b[id*pgid(db.pageSize)]))
}
// meta retrieves the current meta page reference.
@ -262,17 +246,15 @@ func (db *DB) meta() *meta {
return db.meta1
}
// sync flushes the file descriptor to disk unless "no sync" is enabled.
// sync flushes the file descriptor to disk.
func (db *DB) sync(force bool) error {
if !db.noSync {
if err := syscall.Fsync(int(db.file.Fd())); err != nil {
return err
}
if err := syscall.Fsync(int(db.file.Fd())); err != nil {
return err
}
return nil
}
func (db *DB) Stat() *stat {
func (db *DB) Stat() *Stat {
// TODO: Calculate size, depth, page count (by type), entry count, readers, etc.
return nil
}

View File

@ -4,27 +4,22 @@ import (
"unsafe"
)
type nodeid uint16
const lnodeSize = int(unsafe.Sizeof(lnode{}))
// lnode represents a node on a leaf page.
type lnode struct {
flags uint16
keySize uint16
dataSize uint32
data uintptr // Pointer to the beginning of the data.
flags uint32
pos uint32
ksize uint32
vsize uint32
}
// key returns a byte slice that of the node key.
// key returns a byte slice of the node key.
func (n *lnode) key() []byte {
return (*[MaxKeySize]byte)(unsafe.Pointer(&n.data))[:n.keySize]
return (*[MaxKeySize]byte)(unsafe.Pointer(&n))[n.pos : n.pos+n.ksize]
}
// data returns a byte slice that of the node data.
func (n *lnode) data() []byte {
return (*[MaxKeySize]byte)(unsafe.Pointer(&n.data))[n.keySize : n.keySize+n.dataSize]
}
// lnodeSize returns the number of bytes required to store a key+data as a leaf node.
func lnodeSize(key []byte, data []byte) int {
return int(unsafe.Offsetof((*lnode)(nil)).data) + len(key) + len(data)
// value returns a byte slice of the node value.
func (n *lnode) value() []byte {
return (*[MaxKeySize]byte)(unsafe.Pointer(&n))[n.pos+n.ksize : n.pos+n.ksize+n.vsize]
}

17
lnodes.go Normal file
View File

@ -0,0 +1,17 @@
package bolt
type lnodes []lnode
// replace replaces the node at the given index with a new key/value size.
func (s lnodes) replace(key, value []byte, index int) lnodes {
n := &s[index]
n.pos = 0
n.ksize = len(key)
n.vsize = len(value)
return s
}
// insert places a new node at the given index with a key/value size.
func (s lnodes) insert(key, value []byte, index int) lnodes {
return append(s[0:index], lnode{ksize: len(key), vsize: len(value)}, s[index:len(s)])
}

View File

@ -1,7 +1,9 @@
package bolt
var (
InvalidMetaPageError = &Error{"Invalid meta page", nil}
InvalidError = &Error{"Invalid database", nil}
VersionMismatchError = &Error{"version mismatch", nil}
InvalidMetaPageError = &Error{"invalid meta page", nil}
)
const magic uint32 = 0xC0DEC0DE

View File

@ -2,6 +2,7 @@ package bolt
import (
"os"
"time"
"github.com/stretchr/testify/mock"
)

27
page.go
View File

@ -4,12 +4,11 @@ import (
"unsafe"
)
const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr))
const maxPageSize = 0x8000
const minKeyCount = 2
const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).data))
const minPageKeys = 2
const minKeysPerPage = 2
const maxNodesPerPage = 65535
const fillThreshold = 250 // 25%
const (
@ -21,12 +20,11 @@ const (
type pgid uint64
type page struct {
id pgid
flags uint32
lower uint16
upper uint16
count uint32
data uintptr
id pgid
flags uint16
count uint16
overflow uint32
ptr uintptr
}
// meta returns a pointer to the metadata section of the page.
@ -52,5 +50,10 @@ func (p *page) init(pageSize int) {
m.version = version
m.pageSize = uint32(pageSize)
m.pgid = 1
m.buckets.root = 0
m.sys.root = 0
}
// lnode retrieves the leaf node by index
func (p *page) lnode(index int) *lnode {
return &((*[maxNodesPerPage]lnode)(unsafe.Pointer(&p.ptr)))[index]
}

View File

@ -1,19 +1,13 @@
package bolt
import (
"unsafe"
)
// RWTransaction represents a transaction that can read and write data.
// Only one read/write transaction can be active for a DB at a time.
type RWTransaction struct {
Transaction
dirtyPages map[pgid]*page
freelist []pgid
}
// init initializes the transaction and associates it with a database.
func (t *RWTransaction) init(db *DB, meta *meta) {
t.dirtyPages = make(map[pgid]*page)
t.freelist = make([]pgid)
t.Transaction.init(db, meta)
}
// TODO: Allocate scratch meta page.
@ -50,150 +44,86 @@ func (t *RWTransaction) close() error {
}
// CreateBucket creates a new bucket.
func (t *RWTransaction) CreateBucket(name string, dupsort bool) (*Bucket, error) {
func (t *RWTransaction) CreateBucket(name string) error {
if t.db == nil {
return nil, InvalidTransactionError
return InvalidTransactionError
}
// Check if bucket already exists.
if b := t.buckets[name]; b != nil {
return nil, &Error{"bucket already exists", nil}
if b, err := t.Bucket(name); err != nil {
return err
} else if b != nil {
return &Error{"bucket already exists", nil}
}
// Create a new bucket entry.
var buf [unsafe.Sizeof(bucket{})]byte
var raw = (*bucket)(unsafe.Pointer(&buf[0]))
raw.root = p_invalid
// TODO: Set dupsort flag.
raw.root = 0
// Open cursor to system bucket.
c, err := t.Cursor(&t.sysbuckets)
if err != nil {
return nil, err
c := t.sys.cursor()
if c.Goto([]byte(name)) {
// TODO: Delete node first.
}
// Put new entry into system bucket.
if err := c.Put([]byte(name), buf[:]); err != nil {
return nil, err
// Insert new node.
if err := t.insert([]byte(name), buf[:]); err != nil {
return err
}
// Save reference to bucket.
b := &Bucket{name: name, bucket: raw, isNew: true}
t.buckets[name] = b
// TODO: dbflag |= DB_DIRTY;
return b, nil
return nil
}
// DropBucket deletes a bucket.
func (t *RWTransaction) DeleteBucket(b *Bucket) error {
// TODO: Find bucket.
// TODO: Remove entry from system bucket.
// TODO: Remove from main DB.
// TODO: Delete entry from system bucket.
// TODO: Free all pages.
// TODO: Remove cursor.
return nil
}
// Put sets the value for a key in a given bucket.
func (t *Transaction) Put(name string, key []byte, value []byte) error {
c, err := t.Cursor(name)
if err != nil {
return nil, err
}
return c.Put(key, value)
}
// page returns a reference to the page with a given id.
// If page has been written to then a temporary bufferred page is returned.
func (t *Transaction) page(id int) *page {
// Check the dirty pages first.
if p, ok := t.pages[id]; ok {
return p
}
// Otherwise return directly from the mmap.
return t.Transaction.page(id)
}
// Flush (some) dirty pages to the map, after clearing their dirty flag.
// @param[in] txn the transaction that's being committed
// @param[in] keep number of initial pages in dirty_list to keep dirty.
// @return 0 on success, non-zero on failure.
func (t *Transaction) flush(keep bool) error {
func (t *RWTransaction) flush(keep bool) error {
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
// TODO: Loop over each dirty page and write it to disk.
return nil
}
func (t *RWTransaction) DeleteBucket(name string) error {
// TODO: Remove from main DB.
// TODO: Delete entry from system bucket.
// TODO: Free all pages.
// TODO: Remove cursor.
return nil
}
func (c *RWCursor) Put(key []byte, value []byte) error {
// Make sure this cursor was created by a transaction.
if c.transaction == nil {
return &Error{"invalid cursor", nil}
func (t *RWTransaction) Put(name string, key []byte, value []byte) error {
b := t.Bucket(name)
if b == nil {
return BucketNotFoundError
}
db := c.transaction.db
// Validate the key we're using.
if key == nil {
// Validate the key and data size.
if len(key) == 0 {
return &Error{"key required", nil}
} else if len(key) > db.maxKeySize {
} else if len(key) > MaxKeySize {
return &Error{"key too large", nil}
}
// TODO: Validate data size based on MaxKeySize if DUPSORT.
// Validate the size of our data.
if len(data) > MaxDataSize {
} else if len(value) > MaxDataSize {
return &Error{"data too large", nil}
}
// If we don't have a root page then add one.
if c.bucket.root == p_invalid {
p, err := c.newLeafPage()
if err != nil {
return err
}
c.push(p)
c.bucket.root = p.id
c.bucket.root++
// TODO: *mc->mc_dbflag |= DB_DIRTY;
// TODO? mc->mc_flags |= C_INITIALIZED;
}
// Move cursor to insertion position.
c := b.cursor()
replace := c.Goto()
p, index := c.current()
// TODO: Move to key.
exists, err := c.moveTo(key)
if err != nil {
// Insert a new node.
if err := t.insert(p, index, key, value, replace); err != nil {
return err
}
// TODO: spill?
if err := c.spill(key, data); err != nil {
return err
}
// Make sure all cursor pages are writable
if err := c.touch(); err != nil {
return err
}
// If key does not exist the
if exists {
node := c.currentNode()
}
return nil
}
func (c *Cursor) Delete(key []byte) error {
func (t *RWTransaction) Delete(key []byte) error {
// TODO: Traverse to the correct node.
// TODO: If missing, exit.
// TODO: Remove node from page.
@ -201,38 +131,6 @@ func (c *Cursor) Delete(key []byte) error {
return nil
}
// newLeafPage allocates and initialize new a new leaf page.
func (c *RWCursor) newLeafPage() (*page, error) {
// Allocate page.
p, err := c.allocatePage(1)
if err != nil {
return nil, err
}
// Set flags and bounds.
p.flags = p_leaf | p_dirty
p.lower = pageHeaderSize
p.upper = c.transaction.db.pageSize
return p, nil
}
// newBranchPage allocates and initialize new a new branch page.
func (b *RWCursor) newBranchPage() (*page, error) {
// Allocate page.
p, err := c.allocatePage(1)
if err != nil {
return nil, err
}
// Set flags and bounds.
p.flags = p_branch | p_dirty
p.lower = pageHeaderSize
p.upper = c.transaction.db.pageSize
return p, nil
}
// allocate returns a contiguous block of memory starting at a given page.
func (t *RWTransaction) allocate(count int) (*page, error) {
// TODO: Find a continuous block of free pages.
@ -240,11 +138,59 @@ func (t *RWTransaction) allocate(count int) (*page, error) {
return nil, nil
}
func (t *RWTransaction) insert(p *page, index int, key []byte, data []byte, replace bool) error {
nodes := copy(p.lnodes())
if replace {
nodes = nodes.replace(index, key, data)
} else {
nodes = nodes.insert(index, key, data)
}
// If our page fits in the same size page then just write it.
if pageHeaderSize + nodes.size() < p.size() {
// TODO: Write new page.
// TODO: Update parent branches.
}
// Calculate total page size.
size := pageHeaderSize
for _, n := range nodes {
size += lnodeSize + n.ksize + n.vsize
}
// If our new page fits in our current page size then just write it.
if size < t.db.pageSize {
return t.writeLeafPage(p.id, nodes)
}
var nodesets [][]lnodes
if size < t.db.pageSize {
nodesets = [][]lnodes{nodes}
}
nodesets := t.split(nodes)
func (t *RWTransaction) insert(key []byte, data []byte) error {
// TODO: If there is not enough space on page for key+data then split.
// TODO: Move remaining data on page forward.
// TODO: Write leaf node to current location.
// TODO: Adjust available page size.
return nil
}
// split takes a list of nodes and returns multiple sets of nodes if a
// page split is required.
func (t *RWTransaction) split(nodes []lnodes) [][]lnodes {
// If the size is less than the page size then just return the current set.
if size < t.db.pageSize {
return [][]lnodes{nodes}
}
// Otherwise loop over nodes and split up into multiple pages.
var nodeset []lnodes
var nodesets [][]lnodes
for _, n := range nodes {
}
}

View File

@ -1,7 +1,6 @@
package bolt
import (
"strings"
"unsafe"
)
@ -20,11 +19,12 @@ const (
type txnid uint64
type Transaction struct {
id int
db *DB
meta *meta
sys Bucket
buckets map[string]*Bucket
id int
db *DB
meta *meta
sys Bucket
buckets map[string]*Bucket
pages map[pgid]*page
}
// init initializes the transaction and associates it with a database.
@ -32,6 +32,7 @@ func (t *Transaction) init(db *DB, meta *meta) {
t.db = db
t.meta = meta
t.buckets = make(map[string]*Bucket)
t.pages = nil
t.sys.transaction = t
t.sys.bucket = &t.meta.sys
}
@ -47,27 +48,21 @@ func (t *Transaction) DB() *DB {
// Bucket retrieves a bucket by name.
func (t *Transaction) Bucket(name string) (*Bucket, error) {
return t.bucket(name)
}
func (t *Transaction) bucket(name string) (*Bucket, error) {
// Return bucket if it's already been looked up.
// Return cached reference if it's already been looked up.
if b := t.buckets[name]; b != nil {
return b, nil
}
// Retrieve bucket data from the system bucket.
data, err := c.get(&t.sys, []byte(name))
if err != nil {
return nil, err
} else if data == nil {
return nil, &Error{"bucket not found", nil}
value := t.get(&t.sys, []byte(name))
if value == nil {
return nil
}
// Create a bucket that overlays the data.
b := &Bucket{
bucket: (*bucket)(unsafe.Pointer(&data[0])),
name: name,
bucket: (*bucket)(unsafe.Pointer(&data[0])),
name: name,
transaction: t,
}
t.buckets[name] = b
@ -76,85 +71,39 @@ func (t *Transaction) bucket(name string) (*Bucket, error) {
}
// Cursor creates a cursor associated with a given bucket.
func (t *Transaction) Cursor(b *Bucket) (*Cursor, error) {
if b == nil {
return nil, &Error{"bucket required", nil}
} else
// Create a new cursor and associate it with the transaction and bucket.
c := &Cursor{
transaction: t,
bucket: b,
}
// Set the first page if available.
if b.root != p_invalid {
p := t.db.page(t.db.data, int(b.root))
c.top = 0
c.pages = append(c.pages, p)
}
return nil, nil
}
// Get retrieves the value for a key in a given bucket.
func (t *Transaction) Get(name string, key []byte) ([]byte, error) {
c, err := t.Cursor(name)
func (t *Transaction) Cursor(name string) (*Cursor, error) {
b, err := t.Bucket(name)
if err != nil {
return nil, err
}
return c.Get(key)
return b.Cursor()
}
func (t *Transaction) page(id int) (*page, error) {
return t.db.page(id)
// Get retrieves the value for a key in a named bucket.
func (t *Transaction) Get(name string, key []byte) ([]byte, error) {
b, err := t.Bucket(name)
if err != nil {
return nil, err
}
return b.Get(key)
}
// Stat returns information about a bucket's internal structure.
func (t *Transaction) Stat(name string) *stat {
func (t *Transaction) Stat(name string) *Stat {
// TODO
return nil
}
// //
// //
// //
// //
// //
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ CONVERTED ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ //
// //
// //
// //
// //
// //
// Return the data associated with a given node.
// @param[in] txn The transaction for this operation.
// @param[in] leaf The node being read.
// @param[out] data Updated to point to the node's data.
// @return 0 on success, non-zero on failure.
func (t *Transaction) readNode(leaf *node, data []byte) error {
/*
MDB_page *omp; // overflow page
pgno_t pgno;
int rc;
if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
data->mv_size = NODEDSZ(leaf);
data->mv_data = NODEDATA(leaf);
return MDB_SUCCESS;
// page returns a reference to the page with a given id.
// If page has been written to then a temporary bufferred page is returned.
func (t *Transaction) page(id pgid) *page {
// Check the dirty pages first.
if t.pages != nil {
if p, ok := t.pages[id]; ok {
return p
}
}
// Read overflow data.
data->mv_size = NODEDSZ(leaf);
memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
if ((rc = mdb_page_get(txn, pgno, &omp, NULL)) != 0) {
DPRINTF(("read overflow page %"Z"u failed", pgno));
return rc;
}
data->mv_data = METADATA(omp);
return MDB_SUCCESS;
*/
return nil
// Otherwise return directly from the mmap.
return t.db.page(id)
}