mirror of https://github.com/hak5/bolt.git
Mmap remap.
parent
7824a66308
commit
7bb878ff69
90
db.go
90
db.go
|
@ -15,6 +15,9 @@ const (
|
|||
|
||||
const minPageSize = 0x1000
|
||||
|
||||
const minMmapSize = 1 << 22 // 4MB
|
||||
const maxMmapStep = 1 << 30 // 1GB
|
||||
|
||||
type DB struct {
|
||||
os _os
|
||||
syscall _syscall
|
||||
|
@ -98,7 +101,7 @@ func (db *DB) Open(path string, mode os.FileMode) error {
|
|||
}
|
||||
|
||||
// Memory map the data file.
|
||||
if err := db.mmap(); err != nil {
|
||||
if err := db.mmap(0); err != nil {
|
||||
db.close()
|
||||
return err
|
||||
}
|
||||
|
@ -113,7 +116,19 @@ func (db *DB) Open(path string, mode os.FileMode) error {
|
|||
}
|
||||
|
||||
// mmap opens the underlying memory-mapped file and initializes the meta references.
|
||||
func (db *DB) mmap() error {
|
||||
// minsz is the minimum size that the new mmap can be.
|
||||
func (db *DB) mmap(minsz int) error {
|
||||
db.mmaplock.Lock()
|
||||
defer db.mmaplock.Unlock()
|
||||
|
||||
// Dereference all mmap references before unmapping.
|
||||
if db.rwtransaction != nil {
|
||||
db.rwtransaction.dereference()
|
||||
}
|
||||
|
||||
// Unmap existing data before continuing.
|
||||
db.munmap()
|
||||
|
||||
info, err := db.file.Stat()
|
||||
if err != nil {
|
||||
return &Error{"mmap stat error", err}
|
||||
|
@ -121,8 +136,12 @@ func (db *DB) mmap() error {
|
|||
return &Error{"file size too small", err}
|
||||
}
|
||||
|
||||
// TODO(benbjohnson): Determine appropriate mmap size by db size.
|
||||
size := 2 << 30
|
||||
// Ensure the size is at least the minimum size.
|
||||
var size = int(info.Size())
|
||||
if size < minsz {
|
||||
size = minsz
|
||||
}
|
||||
size = db.mmapSize(minsz)
|
||||
|
||||
// Memory-map the data file as a byte slice.
|
||||
if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil {
|
||||
|
@ -144,6 +163,35 @@ func (db *DB) mmap() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// munmap unmaps the data file from memory.
|
||||
func (db *DB) munmap() {
|
||||
if db.data != nil {
|
||||
if err := db.syscall.Munmap(db.data); err != nil {
|
||||
panic("unmap error: " + err.Error())
|
||||
}
|
||||
db.data = nil
|
||||
}
|
||||
}
|
||||
|
||||
// mmapSize determines the appropriate size for the mmap given the current size
|
||||
// of the database. The minimum size is 4MB and doubles until it reaches 1GB.
|
||||
func (db *DB) mmapSize(size int) int {
|
||||
if size < minMmapSize {
|
||||
return minMmapSize
|
||||
} else if size < maxMmapStep {
|
||||
size *= 2
|
||||
} else {
|
||||
size += maxMmapStep
|
||||
}
|
||||
|
||||
// Ensure that the mmap size is a multiple of the page size.
|
||||
if (size % db.pageSize) != 0 {
|
||||
size = ((size / db.pageSize) + 1) * db.pageSize
|
||||
}
|
||||
|
||||
return size
|
||||
}
|
||||
|
||||
// init creates a new database file and initializes its meta pages.
|
||||
func (db *DB) init() error {
|
||||
// Set the page size to the OS page size.
|
||||
|
@ -196,8 +244,14 @@ func (db *DB) Close() {
|
|||
}
|
||||
|
||||
func (db *DB) close() {
|
||||
// TODO: Undo everything in Open().
|
||||
// Wait for pending transactions before closing and unmapping the data.
|
||||
// db.mmaplock.Lock()
|
||||
// defer db.mmaplock.Unlock()
|
||||
|
||||
// TODO(benbjohnson): Undo everything in Open().
|
||||
db.freelist = nil
|
||||
|
||||
db.munmap()
|
||||
}
|
||||
|
||||
// Transaction creates a read-only transaction.
|
||||
|
@ -206,6 +260,11 @@ func (db *DB) Transaction() (*Transaction, error) {
|
|||
db.metalock.Lock()
|
||||
defer db.metalock.Unlock()
|
||||
|
||||
// Obtain a read-only lock on the mmap. When the mmap is remapped it will
|
||||
// obtain a write lock so all transactions must finish before it can be
|
||||
// remapped.
|
||||
db.mmaplock.RLock()
|
||||
|
||||
// Exit if the database is not open yet.
|
||||
if !db.opened {
|
||||
return nil, DatabaseNotOpenError
|
||||
|
@ -260,6 +319,9 @@ func (db *DB) removeTransaction(t *Transaction) {
|
|||
db.metalock.Lock()
|
||||
defer db.metalock.Unlock()
|
||||
|
||||
// Release the read lock on the mmap.
|
||||
db.mmaplock.RUnlock()
|
||||
|
||||
// Remove the transaction.
|
||||
for i, txn := range db.transactions {
|
||||
if txn == t {
|
||||
|
@ -412,7 +474,7 @@ func (db *DB) meta() *meta {
|
|||
}
|
||||
|
||||
// allocate returns a contiguous block of memory starting at a given page.
|
||||
func (db *DB) allocate(count int) *page {
|
||||
func (db *DB) allocate(count int) (*page, error) {
|
||||
// Allocate a temporary buffer for the page.
|
||||
buf := make([]byte, count*db.pageSize)
|
||||
p := (*page)(unsafe.Pointer(&buf[0]))
|
||||
|
@ -420,16 +482,22 @@ func (db *DB) allocate(count int) *page {
|
|||
|
||||
// Use pages from the freelist if they are available.
|
||||
if p.id = db.freelist.allocate(count); p.id != 0 {
|
||||
return p
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// TODO(benbjohnson): Resize mmap().
|
||||
|
||||
// If there are no free pages then allocate from the end of the file.
|
||||
// Resize mmap() if we're at the end.
|
||||
p.id = db.rwtransaction.meta.pgid
|
||||
var minsz = int((p.id+pgid(count))+1) * db.pageSize
|
||||
if minsz >= len(db.data) {
|
||||
if err := db.mmap(minsz); err != nil {
|
||||
return nil, &Error{"mmap allocate error", err}
|
||||
}
|
||||
}
|
||||
|
||||
// Move the page id high water mark.
|
||||
db.rwtransaction.meta.pgid += pgid(count)
|
||||
|
||||
return p
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// sync flushes the file descriptor to disk.
|
||||
|
|
12
db_test.go
12
db_test.go
|
@ -187,6 +187,18 @@ func TestDBWriteFail(t *testing.T) {
|
|||
t.Skip("pending") // TODO(benbjohnson)
|
||||
}
|
||||
|
||||
// Ensure that the mmap grows appropriately.
|
||||
func TestDBMmapSize(t *testing.T) {
|
||||
db := &DB{pageSize: 4096}
|
||||
assert.Equal(t, db.mmapSize(0), minMmapSize)
|
||||
assert.Equal(t, db.mmapSize(16384), minMmapSize)
|
||||
assert.Equal(t, db.mmapSize(minMmapSize-1), minMmapSize)
|
||||
assert.Equal(t, db.mmapSize(minMmapSize), minMmapSize*2)
|
||||
assert.Equal(t, db.mmapSize(10000000), 20000768)
|
||||
assert.Equal(t, db.mmapSize((1<<30)-1), 2147483648)
|
||||
assert.Equal(t, db.mmapSize(1<<30), 1<<31)
|
||||
}
|
||||
|
||||
// withDB executes a function with a database reference.
|
||||
func withDB(fn func(*DB, string)) {
|
||||
f, _ := ioutil.TempFile("", "bolt-")
|
||||
|
|
24
node.go
24
node.go
|
@ -161,10 +161,8 @@ func (n *node) write(p *page) {
|
|||
// Initialize page.
|
||||
if n.isLeaf {
|
||||
p.flags |= p_leaf
|
||||
// warn("∑", p.id, "leaf")
|
||||
} else {
|
||||
p.flags |= p_branch
|
||||
// warn("∑", p.id, "branch")
|
||||
}
|
||||
p.count = uint16(len(n.inodes))
|
||||
|
||||
|
@ -177,13 +175,11 @@ func (n *node) write(p *page) {
|
|||
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
|
||||
elem.ksize = uint32(len(item.key))
|
||||
elem.vsize = uint32(len(item.value))
|
||||
// warn(" »", string(item.key), "->", string(item.value))
|
||||
} else {
|
||||
elem := p.branchPageElement(uint16(i))
|
||||
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
|
||||
elem.ksize = uint32(len(item.key))
|
||||
elem.pgid = item.pgid
|
||||
// warn(" »", string(item.key))
|
||||
}
|
||||
|
||||
// Write data for the element to the end of the page.
|
||||
|
@ -341,6 +337,26 @@ func (n *node) rebalance() {
|
|||
n.parent.rebalance()
|
||||
}
|
||||
|
||||
// dereference causes the node to copy all its inode key/value references to heap memory.
|
||||
// This is required when the mmap is reallocated so inodes are not pointing to stale data.
|
||||
func (n *node) dereference() {
|
||||
key := make([]byte, len(n.key))
|
||||
copy(key, n.key)
|
||||
n.key = key
|
||||
|
||||
for i, _ := range n.inodes {
|
||||
inode := &n.inodes[i]
|
||||
|
||||
key := make([]byte, len(inode.key))
|
||||
copy(key, inode.key)
|
||||
inode.key = key
|
||||
|
||||
value := make([]byte, len(inode.value))
|
||||
copy(value, inode.value)
|
||||
inode.value = value
|
||||
}
|
||||
}
|
||||
|
||||
// nodesByDepth sorts a list of branches by deepest first.
|
||||
type nodesByDepth []*node
|
||||
|
||||
|
|
|
@ -9,7 +9,8 @@ import (
|
|||
// Only one read/write transaction can be active for a DB at a time.
|
||||
type RWTransaction struct {
|
||||
Transaction
|
||||
nodes map[pgid]*node
|
||||
nodes map[pgid]*node
|
||||
pending []*node
|
||||
}
|
||||
|
||||
// init initializes the transaction.
|
||||
|
@ -35,7 +36,10 @@ func (t *RWTransaction) CreateBucket(name string) error {
|
|||
}
|
||||
|
||||
// Create a blank root leaf page.
|
||||
p := t.allocate(1)
|
||||
p, err := t.allocate(1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.flags = p_leaf
|
||||
|
||||
// Add bucket to buckets page.
|
||||
|
@ -100,14 +104,15 @@ func (t *RWTransaction) Commit() error {
|
|||
|
||||
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
|
||||
|
||||
// TODO(benbjohnson): Move rebalancing to occur immediately after deletion (?).
|
||||
|
||||
// Rebalance and spill data onto dirty pages.
|
||||
t.rebalance()
|
||||
t.spill()
|
||||
|
||||
// Spill buckets page.
|
||||
p := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
|
||||
p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.buckets.write(p)
|
||||
|
||||
// Write dirty pages to disk.
|
||||
|
@ -135,13 +140,16 @@ func (t *RWTransaction) close() {
|
|||
}
|
||||
|
||||
// allocate returns a contiguous block of memory starting at a given page.
|
||||
func (t *RWTransaction) allocate(count int) *page {
|
||||
p := t.db.allocate(count)
|
||||
func (t *RWTransaction) allocate(count int) (*page, error) {
|
||||
p, err := t.db.allocate(count)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Save to our page cache.
|
||||
t.pages[p.id] = p
|
||||
|
||||
return p
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// rebalance attempts to balance all nodes.
|
||||
|
@ -152,7 +160,7 @@ func (t *RWTransaction) rebalance() {
|
|||
}
|
||||
|
||||
// spill writes all the nodes to dirty pages.
|
||||
func (t *RWTransaction) spill() {
|
||||
func (t *RWTransaction) spill() error {
|
||||
// Keep track of the current root nodes.
|
||||
// We will update this at the end once all nodes are created.
|
||||
type root struct {
|
||||
|
@ -180,6 +188,7 @@ func (t *RWTransaction) spill() {
|
|||
// Split nodes into appropriate sized nodes.
|
||||
// The first node in this list will be a reference to n to preserve ancestry.
|
||||
newNodes := n.split(t.db.pageSize)
|
||||
t.pending = newNodes
|
||||
|
||||
// If this is a root node that split then create a parent node.
|
||||
if n.parent == nil && len(newNodes) > 1 {
|
||||
|
@ -195,7 +204,10 @@ func (t *RWTransaction) spill() {
|
|||
// Write nodes to dirty pages.
|
||||
for i, newNode := range newNodes {
|
||||
// Allocate contiguous space for the node.
|
||||
p := t.allocate((newNode.size() / t.db.pageSize) + 1)
|
||||
p, err := t.allocate((newNode.size() / t.db.pageSize) + 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write the node to the page.
|
||||
newNode.write(p)
|
||||
|
@ -215,6 +227,8 @@ func (t *RWTransaction) spill() {
|
|||
newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid)
|
||||
}
|
||||
}
|
||||
|
||||
t.pending = nil
|
||||
}
|
||||
|
||||
// Update roots with new roots.
|
||||
|
@ -224,12 +238,12 @@ func (t *RWTransaction) spill() {
|
|||
|
||||
// Clear out nodes now that they are all spilled.
|
||||
t.nodes = make(map[pgid]*node)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// write writes any dirty pages to disk.
|
||||
func (t *RWTransaction) write() error {
|
||||
// TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize.
|
||||
|
||||
// Sort pages by id.
|
||||
pages := make(pages, 0, len(t.pages))
|
||||
for _, p := range t.pages {
|
||||
|
@ -247,6 +261,9 @@ func (t *RWTransaction) write() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Clear out page cache.
|
||||
t.pages = make(map[pgid]*page)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -280,3 +297,14 @@ func (t *RWTransaction) node(pgid pgid, parent *node) *node {
|
|||
|
||||
return n
|
||||
}
|
||||
|
||||
// dereference removes all references to the old mmap.
|
||||
func (t *RWTransaction) dereference() {
|
||||
for _, n := range t.nodes {
|
||||
n.dereference()
|
||||
}
|
||||
|
||||
for _, n := range t.pending {
|
||||
n.dereference()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,11 +6,15 @@ import (
|
|||
|
||||
type _syscall interface {
|
||||
Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error)
|
||||
Munmap([]byte) error
|
||||
}
|
||||
|
||||
type syssyscall struct{}
|
||||
|
||||
func (o *syssyscall) Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error) {
|
||||
// err = (EACCES, EBADF, EINVAL, ENODEV, ENOMEM, ENXIO, EOVERFLOW)
|
||||
return syscall.Mmap(fd, offset, length, prot, flags)
|
||||
}
|
||||
|
||||
func (o *syssyscall) Munmap(b []byte) error {
|
||||
return syscall.Munmap(b)
|
||||
}
|
||||
|
|
|
@ -12,3 +12,8 @@ func (m *mocksyscall) Mmap(fd int, offset int64, length int, prot int, flags int
|
|||
args := m.Called(fd, offset, length, prot, flags)
|
||||
return args.Get(0).([]byte), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *mocksyscall) Munmap(b []byte) error {
|
||||
args := m.Called(b)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
type _syscall interface {
|
||||
Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error)
|
||||
Munmap([]byte) error
|
||||
}
|
||||
|
||||
type syssyscall struct{}
|
||||
|
@ -14,3 +15,7 @@ func (o *syssyscall) Mmap(fd int, offset int64, length int, prot int, flags int)
|
|||
// err = (EACCES, EBADF, EINVAL, ENODEV, ENOMEM, ENXIO, EOVERFLOW)
|
||||
return syscall.Mmap(fd, offset, length, prot, flags)
|
||||
}
|
||||
|
||||
func (o *syssyscall) Munmap(b []byte) error {
|
||||
return syscall.Munmap(b)
|
||||
}
|
||||
|
|
|
@ -12,3 +12,8 @@ func (m *mocksyscall) Mmap(fd int, offset int64, length int, prot int, flags int
|
|||
args := m.Called(fd, offset, length, prot, flags)
|
||||
return args.Get(0).([]byte), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *mocksyscall) Munmap(b []byte) error {
|
||||
args := m.Called(b)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue