DB.Open(), pages, and meta.

master
Ben Johnson 2014-01-10 07:32:12 -07:00
parent f922c1d2bc
commit df8333328f
9 changed files with 274 additions and 437 deletions

99
NOTES
View File

@ -1,99 +0,0 @@
NOTES
=====
## Types
- Env
- Txn
- DB (DBI)
- Cursor
- Value (ptr + sz)
- Stat
- Envinfo ()
## Misc
- compare_func
- rel_func (unimplemented?)
## Flags
### Environment
- FIXEDMAP
- NOSUBDIR
- NOSYNC
- RDONLY
- NOMETASYNC
- NOTLS
- NOLOCK
- NORDAHEAD
- NOMEMINIT
### Database
- REVERSEKEY
- DUPSORT
- INTEGERKEY
- DUPFIXED
- INTEGERDUP
- REVERSEDUP
- CREATE
### Put Write Flags
- NOOVERWRITE
- NODUPDATA
- CURRENT
- RESERVE
- APPEND
- APPENDDUP
- MULTIPLE
### Cursor Get Flags
- FIRST
- FIRST_DUP
- GET_BOTH
- GET_BOTH_RANGE
- GET_CURRENT
- GET_MULTIPLE
- LAST
- LAST_DUP
- NEXT
- NEXT_DUP
- MULTIPLE
- NEXT_NODUP
- NEXT_MULTIPLE
- NEXT_NODUP
- PREV
- PREV_DUP
- PREV_NODUP
- SET
- SET_KEY
- SET_RANGE
## Error Codes
- SUCCESS
- KEYEXIST
- NOTFOUND
- PAGE_NOTFOUND
- CORRUPTED
- PANIC
- VERSION MISMATCH
- INVALID
- MAP_FULL
- DBS_FULL
- READERS_FULL
- TLS_FULL
- TXN_FULL
- CURSOR_FULL
- PAGE_FULL
- MAP_RESIZED
- INCOMPATIBLE
- BAD_RSLOT
- BAD_TXN
- BAD_VALSIZE

View File

@ -6,16 +6,13 @@ package bolt
// TODO: #define FREE_DBI 0
// TODO: #define MAIN_DBI 1
type Bucket interface {
}
type bucket struct {
pad int
flags int
depth int
branchPageCount int
leafPageCount int
overflowPageCount int
entryCount int
rootID int
type Bucket struct {
pad uint32
flags uint16
depth uint16
branches pgno
leafs pgno
overflows pgno
entries uint64
root pgno
}

View File

@ -2,8 +2,6 @@ package bolt
const Version = 1
const magic int32 = 0xBEEFC0DE
const (
MaxKeySize = 511
MaxDataSize = 0xffffffff

View File

@ -46,6 +46,13 @@ type cursor struct {
ki []int /**< stack of page indices */
}
type xcursor struct {
cursor cursor
bucket *bucket
bucketx *bucketx
bucketFlag int
}
// Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn.
// @param[in] mc A cursor handle for the current operation.
// @param[in] pflags Flags of the pages to update:

456
db.go
View File

@ -8,26 +8,27 @@ const (
IntegerDupKey
)
var DatabaseAlreadyOpenedError = &Error{"Database already open"}
// TODO: #define MDB_FATAL_ERROR 0x80000000U /** Failed to update the meta page. Probably an I/O error. */
// TODO: #define MDB_ENV_ACTIVE 0x20000000U /** Some fields are initialized. */
// TODO: #define MDB_ENV_TXKEY 0x10000000U /** me_txkey is set */
// TODO: #define MDB_LIVE_READER 0x08000000U /** Have liveness lock in reader table */
// Only a subset of the @ref mdb_env flags can be changed
// at runtime. Changing other flags requires closing the
// environment and re-opening it with the new flags.
// TODO: #define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC|MDB_NOMEMINIT)
// TODO: #define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP| MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD)
type DB interface {
syncEnabled bool
metaSyncEnabled bool
}
type db struct {
sync.Mutex
opened bool
file os.File
flags int
metafile os.File
buf []byte
pageSize int
osPageSize int
readers []*reader
buckets []*bucket
xbuckets []*bucketx /**< array of static DB info */
@ -43,13 +44,14 @@ type db struct {
maxPageNumber int /**< me_mapsize / me_psize */
pageState pageStage /**< state of old pages from freeDB */
dpages []*page /**< list of malloc'd blocks for re-use */
freePageNumbers []int /** IDL of pages that became unused in a write txn */
dirtyPageNumbers []int /** ID2L of pages written during a write txn. Length MDB_IDL_UM_SIZE. */
freePages []int /** IDL of pages that became unused in a write txn */
dirtyPages []int /** ID2L of pages written during a write txn. Length MDB_IDL_UM_SIZE. */
maxFreeOnePage int /** Max number of freelist items that can fit in a single overflow page */
maxNodeSize int /** Max size of a node on a page */
maxKeySize int /**< max size of a key */
}
func NewDB() DB {
return &db{}
}
@ -58,6 +60,163 @@ func (db *db) Path() string {
return db.path
}
func (db *db) Open(path string, mode os.FileMode) error {
var err error
db.Lock()
defer db.Unlock()
// Exit if the database is currently open.
if db.opened {
return DatabaseAlreadyOpenedError
}
// Open data file and separate sync handler for metadata writes.
db.path = path
if db.file, err = os.OpenFile(db.path, O_RDWR | O_CREAT, mode); err != nil {
db.close()
return err
}
if db.metafile, err = os.OpenFile(db.path, O_RDWR | O_SYNC, mode); err != nil {
db.close()
return err
}
// Read enough data to get both meta pages.
var m, m0, m1 *meta
var buf [headerSize + unsafe.Sizeof(meta)]byte
if _, err := db.file.ReadAt(buf, 0); err == nil {
if m0, _ = db.page(buf[:], 0).meta(); m0 != nil {
db.pageSize = m0.free.pad
}
}
if _, err := db.file.ReadAt(buf, db.pageSize); err == nil {
m1, _ = db.page(buf[:], 0).meta()
}
if m0 != nil && m1 != nil {
if m0.txnid > m1.txnid {
m = m0
} else {
m = m1
}
}
// Initialize the page size for new environments.
if m == nil {
db.pageSize = os.Getpagesize()
if db.pageSize > maxPageSize {
db.pageSize = maxPageSize
}
}
// TODO: Check mapsize.
/*
// Was a mapsize configured?
if (!env->me_mapsize) {
// If this is a new environment, take the default,
// else use the size recorded in the existing env.
env->me_mapsize = newenv ? DEFAULT_MAPSIZE : meta.mm_mapsize;
} else if (env->me_mapsize < meta.mm_mapsize) {
// If the configured size is smaller, make sure it's
// still big enough. Silently round up to minimum if not.
size_t minsize = (meta.mm_last_pg + 1) * meta.mm_psize;
if (env->me_mapsize < minsize)
env->me_mapsize = minsize;
}
*/
// Memory map the data file.
if err := db.mmap(); err != nil {
db.close()
return err
}
// Initialize the buffer.
db.buf = make([]byte, db.pageSize)
// Mark the database as opened and return.
db.opened = true
return nil
}
// Read the meta pages and return the latest.
func (db *db) readMeta() *meta {
m := &meta{}
m.read()
/*
if ((i = mdb_env_read_header(env, &meta)) != 0) {
if (i != ENOENT)
return i;
DPUTS("new mdbenv");
newenv = 1;
env->me_psize = env->me_os_psize;
if (env->me_psize > MAX_PAGESIZE)
env->me_psize = MAX_PAGESIZE;
} else {
env->me_psize = meta.mm_psize;
}
rc = mdb_env_map(env, meta.mm_address, newenv);
if (rc)
return rc;
if (newenv) {
if (flags & MDB_FIXEDMAP)
meta.mm_address = env->me_map;
i = mdb_env_init_meta(env, &meta);
if (i != MDB_SUCCESS) {
return i;
}
}
env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1;
env->me_nodemax = (((env->me_psize - PAGEHDRSZ) / MDB_MINKEYS) & -2)
- sizeof(indx_t);
#if !(MDB_MAXKEYSIZE)
env->me_maxkey = env->me_nodemax - (NODESIZE + sizeof(MDB_db));
#endif
env->me_maxpg = env->me_mapsize / env->me_psize;
#if MDB_DEBUG
{
int toggle = mdb_env_pick_meta(env);
MDB_db *db = &env->me_metas[toggle]->mm_dbs[MAIN_DBI];
DPRINTF(("opened database version %u, pagesize %u",
env->me_metas[0]->mm_version, env->me_psize));
DPRINTF(("using meta page %d", toggle));
DPRINTF(("depth: %u", db->md_depth));
DPRINTF(("entries: %"Z"u", db->md_entries));
DPRINTF(("branch pages: %"Z"u", db->md_branch_pages));
DPRINTF(("leaf pages: %"Z"u", db->md_leaf_pages));
DPRINTF(("overflow pages: %"Z"u", db->md_overflow_pages));
DPRINTF(("root: %"Z"u", db->md_root));
}
#endif
return MDB_SUCCESS;
*/
return nil
}
// page retrieves a page reference from a given byte array based on the current page size.
func (db *db) page(b []byte, id int) *page {
return (*page)(unsafe.Pointer(b[id * db.pageSize]))
}
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ CONVERTED ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ //
func (db *db) freePage(p *page) {
/*
mp->mp_next = env->me_dpages;
@ -211,69 +370,6 @@ func (db *db) Transaction(parent *transaction, flags int) (*transaction, error)
return nil
}
// Read the environment parameters of a DB environment before
// mapping it into memory.
// @param[in] env the environment handle
// @param[out] meta address of where to store the meta information
// @return 0 on success, non-zero on failure.
func (db *db) readHeader(meta *meta) error {
/*
MDB_metabuf pbuf;
MDB_page *p;
MDB_meta *m;
int i, rc, off;
enum { Size = sizeof(pbuf) };
// We don't know the page size yet, so use a minimum value.
// Read both meta pages so we can use the latest one.
for (i=off=0; i<2; i++, off = meta->mm_psize) {
#ifdef _WIN32
DWORD len;
OVERLAPPED ov;
memset(&ov, 0, sizeof(ov));
ov.Offset = off;
rc = ReadFile(env->me_fd, &pbuf, Size, &len, &ov) ? (int)len : -1;
if (rc == -1 && ErrCode() == ERROR_HANDLE_EOF)
rc = 0;
#else
rc = pread(env->me_fd, &pbuf, Size, off);
#endif
if (rc != Size) {
if (rc == 0 && off == 0)
return ENOENT;
rc = rc < 0 ? (int) ErrCode() : MDB_INVALID;
DPRINTF(("read: %s", mdb_strerror(rc)));
return rc;
}
p = (MDB_page *)&pbuf;
if (!F_ISSET(p->mp_flags, P_META)) {
DPRINTF(("page %"Z"u not a meta page", p->mp_pgno));
return MDB_INVALID;
}
m = METADATA(p);
if (m->mm_magic != MDB_MAGIC) {
DPUTS("meta has invalid magic");
return MDB_INVALID;
}
if (m->mm_version != MDB_DATA_VERSION) {
DPRINTF(("database is version %u, expected version %u",
m->mm_version, MDB_DATA_VERSION));
return MDB_VERSION_MISMATCH;
}
if (off == 0 || m->mm_txnid > meta->mm_txnid)
*meta = *m;
}
return 0;
*/
return nil
}
// Write the environment parameters of a freshly created DB environment.
// @param[in] env the environment handle
// @param[out] meta address of where to store the meta information
@ -371,7 +467,7 @@ func (db *db) Create() error {
}
// int mdb_env_map(MDB_env *env, void *addr, int newsize)
func (db *db) openMmap(newsize int) {
func (db *db) mmap(newsize int) error {
/*
MDB_page *p;
unsigned int flags = env->me_flags;
@ -506,218 +602,6 @@ func (db *db) getMaxReaderCount(count int) (int, error) {
return 0, nil
}
// Further setup required for opening an MDB environment
func (db *db) open() error {
/*
unsigned int flags = env->me_flags;
int i, newenv = 0, rc;
MDB_meta meta;
#ifdef _WIN32
// See if we should use QueryLimited
rc = GetVersion();
if ((rc & 0xff) > 5)
env->me_pidquery = MDB_PROCESS_QUERY_LIMITED_INFORMATION;
else
env->me_pidquery = PROCESS_QUERY_INFORMATION;
#endif // _WIN32
memset(&meta, 0, sizeof(meta));
if ((i = mdb_env_read_header(env, &meta)) != 0) {
if (i != ENOENT)
return i;
DPUTS("new mdbenv");
newenv = 1;
env->me_psize = env->me_os_psize;
if (env->me_psize > MAX_PAGESIZE)
env->me_psize = MAX_PAGESIZE;
} else {
env->me_psize = meta.mm_psize;
}
// Was a mapsize configured?
if (!env->me_mapsize) {
// If this is a new environment, take the default,
// else use the size recorded in the existing env.
env->me_mapsize = newenv ? DEFAULT_MAPSIZE : meta.mm_mapsize;
} else if (env->me_mapsize < meta.mm_mapsize) {
// If the configured size is smaller, make sure it's
// still big enough. Silently round up to minimum if not.
size_t minsize = (meta.mm_last_pg + 1) * meta.mm_psize;
if (env->me_mapsize < minsize)
env->me_mapsize = minsize;
}
rc = mdb_env_map(env, meta.mm_address, newenv);
if (rc)
return rc;
if (newenv) {
if (flags & MDB_FIXEDMAP)
meta.mm_address = env->me_map;
i = mdb_env_init_meta(env, &meta);
if (i != MDB_SUCCESS) {
return i;
}
}
env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1;
env->me_nodemax = (((env->me_psize - PAGEHDRSZ) / MDB_MINKEYS) & -2)
- sizeof(indx_t);
#if !(MDB_MAXKEYSIZE)
env->me_maxkey = env->me_nodemax - (NODESIZE + sizeof(MDB_db));
#endif
env->me_maxpg = env->me_mapsize / env->me_psize;
#if MDB_DEBUG
{
int toggle = mdb_env_pick_meta(env);
MDB_db *db = &env->me_metas[toggle]->mm_dbs[MAIN_DBI];
DPRINTF(("opened database version %u, pagesize %u",
env->me_metas[0]->mm_version, env->me_psize));
DPRINTF(("using meta page %d", toggle));
DPRINTF(("depth: %u", db->md_depth));
DPRINTF(("entries: %"Z"u", db->md_entries));
DPRINTF(("branch pages: %"Z"u", db->md_branch_pages));
DPRINTF(("leaf pages: %"Z"u", db->md_leaf_pages));
DPRINTF(("overflow pages: %"Z"u", db->md_overflow_pages));
DPRINTF(("root: %"Z"u", db->md_root));
}
#endif
return MDB_SUCCESS;
*/
return nil
}
func (db *db) Open(path string, flags int, mode uint) error {
/*
int oflags, rc, len, excl = -1;
char *lpath, *dpath;
if (env->me_fd!=INVALID_HANDLE_VALUE || (flags & ~(CHANGEABLE|CHANGELESS)))
return EINVAL;
len = strlen(path);
if (flags & MDB_NOSUBDIR) {
rc = len + sizeof(LOCKSUFF) + len + 1;
} else {
rc = len + sizeof(LOCKNAME) + len + sizeof(DATANAME);
}
lpath = malloc(rc);
if (!lpath)
return ENOMEM;
if (flags & MDB_NOSUBDIR) {
dpath = lpath + len + sizeof(LOCKSUFF);
sprintf(lpath, "%s" LOCKSUFF, path);
strcpy(dpath, path);
} else {
dpath = lpath + len + sizeof(LOCKNAME);
sprintf(lpath, "%s" LOCKNAME, path);
sprintf(dpath, "%s" DATANAME, path);
}
rc = MDB_SUCCESS;
flags |= env->me_flags;
if (flags & MDB_RDONLY) {
// silently ignore WRITEMAP when we're only getting read access
flags &= ~MDB_WRITEMAP;
} else {
if (!((env->me_free_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX)) &&
(env->me_dirty_list = calloc(MDB_IDL_UM_SIZE, sizeof(MDB_ID2)))))
rc = ENOMEM;
}
env->me_flags = flags |= MDB_ENV_ACTIVE;
if (rc)
goto leave;
env->me_path = strdup(path);
env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
env->me_dbflags = calloc(env->me_maxdbs, sizeof(uint16_t));
if (!(env->me_dbxs && env->me_path && env->me_dbflags)) {
rc = ENOMEM;
goto leave;
}
// For RDONLY, get lockfile after we know datafile exists
if (!(flags & (MDB_RDONLY|MDB_NOLOCK))) {
rc = mdb_env_setup_locks(env, lpath, mode, &excl);
if (rc)
goto leave;
}
#ifdef _WIN32
if (F_ISSET(flags, MDB_RDONLY)) {
oflags = GENERIC_READ;
len = OPEN_EXISTING;
} else {
oflags = GENERIC_READ|GENERIC_WRITE;
len = OPEN_ALWAYS;
}
mode = FILE_ATTRIBUTE_NORMAL;
env->me_fd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE,
NULL, len, mode, NULL);
#else
if (F_ISSET(flags, MDB_RDONLY))
oflags = O_RDONLY;
else
oflags = O_RDWR | O_CREAT;
env->me_fd = open(dpath, oflags, mode);
#endif
if (env->me_fd == INVALID_HANDLE_VALUE) {
rc = ErrCode();
goto leave;
}
if ((flags & (MDB_RDONLY|MDB_NOLOCK)) == MDB_RDONLY) {
rc = mdb_env_setup_locks(env, lpath, mode, &excl);
if (rc)
goto leave;
}
if ((rc = mdb_env_open2(env)) == MDB_SUCCESS) {
if (flags & (MDB_RDONLY|MDB_WRITEMAP)) {
env->me_mfd = env->me_fd;
} else {
// Synchronous fd for meta writes. Needed even with
// MDB_NOSYNC/MDB_NOMETASYNC, in case these get reset.
#ifdef _WIN32
len = OPEN_EXISTING;
env->me_mfd = CreateFile(dpath, oflags,
FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, len,
mode | FILE_FLAG_WRITE_THROUGH, NULL);
#else
oflags &= ~O_CREAT;
env->me_mfd = open(dpath, oflags | MDB_DSYNC, mode);
#endif
if (env->me_mfd == INVALID_HANDLE_VALUE) {
rc = ErrCode();
goto leave;
}
}
DPRINTF(("opened dbenv %p", (void *) env));
if (excl > 0) {
rc = mdb_env_share_locks(env, &excl);
if (rc)
goto leave;
}
if (!((flags & MDB_RDONLY) ||
(env->me_pbuf = calloc(1, env->me_psize))))
rc = ENOMEM;
}
leave:
if (rc) {
mdb_env_close0(env, excl);
}
free(lpath);
return rc;
*/
return nil
}
// Destroy resources from mdb_env_open(), clear our readers & DBIs
func (db *db) close0(excl) {

View File

@ -1,7 +1,7 @@
package bolt
// info contains information about the database.
type info struct {
// Info contains information about the database.
type Info struct {
MapSize int
LastPageID int
LastTransactionID int

52
meta.go
View File

@ -1,14 +1,8 @@
package bolt
type meta struct {
magic int32
version int32
mapSize int
free *bucket
main *bucket
lastPageNumber int
transactionID int
}
var (
InvalidMetaPageError = &Error{"Invalid meta page"}
)
// TODO: #define mm_psize mm_dbs[0].md_pad
// TODO: #define mm_flags mm_dbs[0].md_flags
@ -30,3 +24,43 @@ type meta struct {
// MDB_rel_func *md_rel; /**< user relocate function */
// void *md_relctx; /**< user-provided context for md_rel */
// } MDB_dbx;
const magic int32 = 0xBEEFC0DE
type meta struct {
magic int32
version int32
mapsize int
free bucket
main bucket
pgno int
txnid int
}
// validate checks the marker bytes and version of the meta page to ensure it matches this binary.
func (m *meta) validate() error {
if m.magic != magic {
return InvalidError
} else if m.version != Version {
return VersionMismatchError
}
return nil
}
// Read the environment parameters of a DB environment before
// mapping it into memory.
// @param[in] env the environment handle
// @param[out] meta address of where to store the meta information
// @return 0 on success, non-zero on failure.
func (m *meta) read(p *page) error {
/*
if (off == 0 || m->mm_txnid > meta->mm_txnid)
*meta = *m;
}
return 0;
*/
return nil
}

57
page.go
View File

@ -4,17 +4,22 @@ import (
"unsafe"
)
const MinPageKeys = 2
const FillThreshold = 250 // 25%
const maxPageSize = 0x8000
var _page page
const headerSize = unsafe.Offsetof(_page.ptr)
const minPageKeys = 2
const fillThreshold = 250 // 25%
const (
BranchPage = 0x01
LeafPage = 0x02
OverflowPage = 0x04
MetaPage = 0x08
DirtyPage = 0x10 /**< dirty page, also set for #P_SUBP pages */
SubPage = 0x40
KeepPage = 0x8000 /**< leave this page alone during spill */
p_branch = 0x01
p_leaf = 0x02
p_overflow = 0x04
p_meta = 0x08
p_dirty = 0x10 /**< dirty page, also set for #P_SUBP pages */
p_sub = 0x40
p_keep = 0x8000 /**< leave this page alone during spill */
)
// maxCommitPages is the maximum number of pages to commit in one writev() call.
@ -36,16 +41,15 @@ const maxWriteByteCount 0x80000000U // TODO: #define MAX_WRITE 0x80000000U >>
// TODO: #define MDB_SPLIT_REPLACE MDB_APPENDDUP /**< newkey is not new */
type pgno uint64
type page struct {
header struct {
id int
next *page // (?)
id pgno
flags int
lower int
upper int
overflowPageCount int
}
metadata []byte
overflow int
ptr int
}
type pageState struct {
@ -53,6 +57,28 @@ type pageState struct {
last int /**< ID of last used record, or 0 if !mf_pghead */
}
// meta returns a pointer to the metadata section of the page.
func (p *page) meta() (*meta, error) {
// Exit if page is not a meta page.
if (p.flags & p_meta) != 0 {
return InvalidMetaPageError
}
// Cast the meta section and validate before returning.
m := (*meta)(unsafe.Pointer(&p.ptr))
if err := m.validate(); err != nil {
return nil, err
}
return m, nil
}
// nodeCount returns the number of nodes on the page.
func (p *page) nodeCount() int {
return 0 // (p.header.lower - unsafe.Sizeof(p.header) >> 1
@ -67,4 +93,3 @@ func (p *page) remainingSize() int {
func (p *page) remainingSize() int {
return p.header.upper - p.header.lower
}

View File

@ -1,9 +0,0 @@
package bolt
type xcursor struct {
cursor cursor
bucket *bucket
bucketx *bucketx
bucketFlag int
}