master
Ben Johnson 2014-01-24 12:51:56 -07:00
parent 779a8e07eb
commit 20b26eac78
8 changed files with 561 additions and 2221 deletions

View File

@ -185,3 +185,11 @@ There are several different types of pages:
Within each page there are one or more elements called nodes.
In branch pages, these nodes store references to other child pages in the tree.
In leaf pages, these nodes store the actual key/value data.
## TODO
The following is a list of items to do on the Bolt project:
1. Resize map. (Make sure there are no reader txns before resizing)
2. DB.Copy()

View File

@ -10,17 +10,6 @@ const (
c_untrack = 0x40 /**< Un-track cursor when closing */
)
/*
type Cursor interface {
First() error
Last() error
Next() ([]byte, []byte, error)
Prev() ([]byte, []byte, error)
Current() ([]byte, []byte, error)
Get([]byte) ([]byte, error)
}
*/
type Cursor struct {
flags int
next *Cursor

737
db.go
View File

@ -8,21 +8,15 @@ import (
)
const (
NoSync = iota
NoMetaSync
DupSort
IntegerKey
IntegerDupKey
db_nosync = iota
db_nometasync
)
var DatabaseNotOpenError = &Error{"db is not open", nil}
var DatabaseAlreadyOpenedError = &Error{"db already open", nil}
var TransactionInProgressError = &Error{"writable transaction is already in progress", nil}
// TODO: #define MDB_FATAL_ERROR 0x80000000U /** Failed to update the meta page. Probably an I/O error. */
// TODO: #define MDB_ENV_ACTIVE 0x20000000U /** Some fields are initialized. */
// TODO: #define MDB_ENV_TXKEY 0x10000000U /** me_txkey is set */
// TODO: #define MDB_LIVE_READER 0x08000000U /** Have liveness lock in reader table */
var (
DatabaseNotOpenError = &Error{"db is not open", nil}
DatabaseAlreadyOpenedError = &Error{"db already open", nil}
TransactionInProgressError = &Error{"writable transaction is already in progress", nil}
)
type DB struct {
sync.Mutex
@ -30,45 +24,48 @@ type DB struct {
os _os
syscall _syscall
path string
file file
metafile file
data []byte
buf []byte
meta0 *meta
meta1 *meta
pageSize int
readers []*reader
buckets []*Bucket
bucketFlags []int /**< array of flags from MDB_db.md_flags */
path string
mmapSize int /**< size of the data memory map */
size int /**< current file size */
pbuf []byte
transaction *RWTransaction /**< current write transaction */
rwtransaction *RWTransaction
transactions []*Transaction
maxPageNumber int /**< me_mapsize / me_psize */
dpages []*page /**< list of malloc'd blocks for re-use */
freePages []int /** IDL of pages that became unused in a write txn */
dirtyPages []int /** ID2L of pages written during a write txn. Length MDB_IDL_UM_SIZE. */
// TODO: scratch []*page // list of temp pages for writing.
readers []*reader
maxFreeOnePage int /** Max number of freelist items that can fit in a single overflow page */
maxPageDataSize int
maxNodeSize int /** Max size of a node on a page */
maxKeySize int /**< max size of a key */
}
// NewDB creates a new DB instance.
func NewDB() *DB {
return &DB{}
}
// Path returns the path to currently open database file.
func (db *DB) Path() string {
return db.path
}
// Open opens a data file at the given path and initializes the database.
// If the file does not exist then it will be created automatically.
func (db *DB) Open(path string, mode os.FileMode) error {
var err error
db.Lock()
defer db.Unlock()
// Initialize OS/Syscall references.
// These are overridden by mocks during some tests.
if db.os == nil {
db.os = &sysos{}
}
@ -96,12 +93,12 @@ func (db *DB) Open(path string, mode os.FileMode) error {
var m, m0, m1 *meta
var buf [pageHeaderSize + int(unsafe.Sizeof(meta{}))]byte
if _, err := db.file.ReadAt(buf[:], 0); err == nil {
if m0, _ = db.page(buf[:], 0).meta(); m0 != nil {
if m0, _ = db.pageInBuffer(buf[:], 0).meta(); m0 != nil {
db.pageSize = int(m0.free.pad)
}
}
if _, err := db.file.ReadAt(buf[:], int64(db.pageSize)); err == nil {
m1, _ = db.page(buf[:], 0).meta()
m1, _ = db.pageInBuffer(buf[:], 0).meta()
}
if m0 != nil && m1 != nil {
if m0.txnid > m1.txnid {
@ -143,7 +140,7 @@ func (db *DB) Open(path string, mode os.FileMode) error {
return nil
}
// int mdb_env_map(MDB_env *env, void *addr, int newsize)
// mmap opens the underlying memory-mapped file and initializes the meta references.
func (db *DB) mmap() error {
var err error
@ -162,13 +159,11 @@ func (db *DB) mmap() error {
return err
}
// TODO?: If nordahead, then: madvise(env->me_map, env->me_mapsize, MADV_RANDOM);
// Save references to the meta pages.
if db.meta0, err = db.page(db.data, 0).meta(); err != nil {
if db.meta0, err = db.page(0).meta(); err != nil {
return &Error{"meta0 error", err}
}
if db.meta1, err = db.page(db.data, 1).meta(); err != nil {
if db.meta1, err = db.page(1).meta(); err != nil {
return &Error{"meta1 error", err}
}
@ -186,7 +181,7 @@ func (db *DB) init() error {
// Create two meta pages on a buffer.
buf := make([]byte, db.pageSize*2)
for i := 0; i < 2; i++ {
p := db.page(buf[:], i)
p := db.pageInBuffer(buf[:], i)
p.id = pgno(i)
p.init(db.pageSize)
}
@ -199,12 +194,20 @@ func (db *DB) init() error {
return nil
}
func (db *DB) close() {
// TODO
// Close releases all resources related to the database.
func (db *DB) Close() {
db.Lock()
defer db.Unlock()
s.close()
}
// Transaction creates a transaction that's associated with this database.
func (db *DB) Transaction(writable bool) (*Transaction, error) {
func (db *DB) close() {
// TODO: Undo everything in Open().
}
// Transaction creates a read-only transaction.
// Multiple read-only transactions can be used concurrently.
func (db *DB) Transaction() (*Transaction, error) {
db.Lock()
defer db.Unlock()
@ -212,18 +215,13 @@ func (db *DB) Transaction(writable bool) (*Transaction, error) {
if !db.opened {
return nil, DatabaseNotOpenError
}
// Exit if a writable transaction is currently in progress.
if writable && db.transaction != nil {
return nil, TransactionInProgressError
}
// Create a transaction associated with the database.
t := &Transaction{
db: db,
meta: db.meta(),
writable: writable,
buckets: make(map[string]*Bucket),
cursors: make(map[uint32]*Cursor),
db: db,
meta: db.meta(),
buckets: make(map[string]*Bucket),
cursors: make(map[uint32]*Cursor),
}
// Save references to the sys•free and sys•buckets buckets.
@ -232,16 +230,32 @@ func (db *DB) Transaction(writable bool) (*Transaction, error) {
t.sysbuckets.transaction = t
t.sysbuckets.bucket = &t.meta.buckets
// We only allow one writable transaction at a time so save the reference.
if writable {
db.transaction = t
return t, nil
}
// RWTransaction creates a read/write transaction.
// Only one read/write transaction is allowed at a time.
func (db *DB) RWTransaction() (*RWTransaction, error) {
// TODO: db.writerMutex.Lock()
// TODO: Add unlock to RWTransaction.Commit() / Abort()
t := &RWTransaction{}
// Exit if a read-write transaction is currently in progress.
if db.transaction != nil {
return nil, TransactionInProgressError
}
return t, nil
}
// page retrieves a page reference from a given byte array based on the current page size.
func (db *DB) page(b []byte, id int) *page {
// page retrieves a page reference from the mmap based on the current page size.
func (db *DB) page(id int) *page {
return (*page)(unsafe.Pointer(&db.data[id*db.pageSize]))
}
// pageInBuffer retrieves a page reference from a given byte array based on the current page size.
func (db *DB) pageInBuffer(b []byte, id int) *page {
return (*page)(unsafe.Pointer(&b[id*db.pageSize]))
}
@ -253,629 +267,22 @@ func (db *DB) meta() *meta {
return db.meta1
}
// //
// //
// //
// //
// //
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ CONVERTED ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ //
// //
// //
// //
// //
// //
func (db *DB) freePage(p *page) {
/*
mp->mp_next = env->me_dpages;
VGMEMP_FREE(env, mp);
env->me_dpages = mp;
*/
}
func (db *DB) freeDirtyPage(p *page) {
/*
if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
mdb_page_free(env, dp);
} else {
// large pages just get freed directly
VGMEMP_FREE(env, dp);
free(dp);
}
*/
}
func (db *DB) freeAllDirtyPages(p *page) {
/*
MDB_env *env = txn->mt_env;
MDB_ID2L dl = txn->mt_u.dirty_list;
unsigned i, n = dl[0].mid;
for (i = 1; i <= n; i++) {
mdb_dpage_free(env, dl[i].mptr);
}
dl[0].mid = 0;
*/
}
// sync flushes the file descriptor to disk unless "no sync" is enabled.
func (db *DB) sync(force bool) error {
/*
int rc = 0;
if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) {
if (env->me_flags & MDB_WRITEMAP) {
int flags = ((env->me_flags & MDB_MAPASYNC) && !force)
? MS_ASYNC : MS_SYNC;
if (MDB_MSYNC(env->me_map, env->me_mapsize, flags))
rc = ErrCode();
#ifdef _WIN32
else if (flags == MS_SYNC && MDB_FDATASYNC(env->me_fd))
rc = ErrCode();
#endif
} else {
if (MDB_FDATASYNC(env->me_fd))
rc = ErrCode();
}
}
return rc;
*/
return nil
}
// Check both meta pages to see which one is newer.
// @param[in] env the environment handle
// @return meta toggle (0 or 1).
func (db *DB) pickMeta() int {
/*
return (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid);
*/
return 0
}
func (db *DB) Create() error {
/*
MDB_env *e;
e = calloc(1, sizeof(MDB_env));
if (!e)
return ENOMEM;
e->me_maxreaders = DEFAULT_READERS;
e->me_maxdbs = e->me_numdbs = 2;
e->me_fd = INVALID_HANDLE_VALUE;
e->me_lfd = INVALID_HANDLE_VALUE;
e->me_mfd = INVALID_HANDLE_VALUE;
#ifdef MDB_USE_POSIX_SEM
e->me_rmutex = SEM_FAILED;
e->me_wmutex = SEM_FAILED;
#endif
e->me_pid = getpid();
GET_PAGESIZE(e->me_os_psize);
VGMEMP_CREATE(e,0,0);
*env = e;
return MDB_SUCCESS;
*/
return nil
}
func (db *DB) setMapSize(size int) error {
/*
// If env is already open, caller is responsible for making
// sure there are no active txns.
if (env->me_map) {
int rc;
void *old;
if (env->me_txn)
return EINVAL;
if (!size)
size = env->me_metas[mdb_env_pick_meta(env)]->mm_mapsize;
else if (size < env->me_mapsize) {
// If the configured size is smaller, make sure it's
// still big enough. Silently round up to minimum if not.
size_t minsize = (env->me_metas[mdb_env_pick_meta(env)]->mm_last_pg + 1) * env->me_psize;
if (size < minsize)
size = minsize;
}
munmap(env->me_map, env->me_mapsize);
env->me_mapsize = size;
old = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : NULL;
rc = mdb_env_map(env, old, 1);
if (rc)
return rc;
if !db.noSync {
if err := syscall.Fsync(int(db.file.Fd())); err != nil {
return err
}
env->me_mapsize = size;
if (env->me_psize)
env->me_maxpg = env->me_mapsize / env->me_psize;
return MDB_SUCCESS;
*/
return nil
}
func (db *DB) setMaxBucketCount(count int) error {
/*
if (env->me_map)
return EINVAL;
env->me_maxdbs = dbs + 2; // Named databases + main and free DB
return MDB_SUCCESS;
*/
return nil
}
func (db *DB) setMaxReaderCount(count int) error {
/*
if (env->me_map || readers < 1)
return EINVAL;
env->me_maxreaders = readers;
return MDB_SUCCESS;
*/
return nil
}
func (db *DB) getMaxReaderCount(count int) (int, error) {
/*
if (!env || !readers)
return EINVAL;
*readers = env->me_maxreaders;
return MDB_SUCCESS;
*/
return 0, nil
}
// Destroy resources from mdb_env_open(), clear our readers & DBIs
func (db *DB) close0(excl int) {
/*
int i;
if (!(env->me_flags & MDB_ENV_ACTIVE))
return;
// Doing this here since me_dbxs may not exist during mdb_env_close
for (i = env->me_maxdbs; --i > MAIN_DBI; )
free(env->me_dbxs[i].md_name.mv_data);
free(env->me_pbuf);
free(env->me_dbflags);
free(env->me_dbxs);
free(env->me_path);
free(env->me_dirty_list);
mdb_midl_free(env->me_free_pgs);
if (env->me_flags & MDB_ENV_TXKEY) {
pthread_key_delete(env->me_txkey);
#ifdef _WIN32
// Delete our key from the global list
for (i=0; i<mdb_tls_nkeys; i++)
if (mdb_tls_keys[i] == env->me_txkey) {
mdb_tls_keys[i] = mdb_tls_keys[mdb_tls_nkeys-1];
mdb_tls_nkeys--;
break;
}
#endif
}
if (env->me_map) {
munmap(env->me_map, env->me_mapsize);
}
if (env->me_mfd != env->me_fd && env->me_mfd != INVALID_HANDLE_VALUE)
(void) close(env->me_mfd);
if (env->me_fd != INVALID_HANDLE_VALUE)
(void) close(env->me_fd);
if (env->me_txns) {
MDB_PID_T pid = env->me_pid;
// Clearing readers is done in this function because
// me_txkey with its destructor must be disabled first.
for (i = env->me_numreaders; --i >= 0; )
if (env->me_txns->mti_readers[i].mr_pid == pid)
env->me_txns->mti_readers[i].mr_pid = 0;
#ifdef _WIN32
if (env->me_rmutex) {
CloseHandle(env->me_rmutex);
if (env->me_wmutex) CloseHandle(env->me_wmutex);
}
// Windows automatically destroys the mutexes when
// the last handle closes.
#elif defined(MDB_USE_POSIX_SEM)
if (env->me_rmutex != SEM_FAILED) {
sem_close(env->me_rmutex);
if (env->me_wmutex != SEM_FAILED)
sem_close(env->me_wmutex);
// If we have the filelock: If we are the
// only remaining user, clean up semaphores.
if (excl == 0)
mdb_env_excl_lock(env, &excl);
if (excl > 0) {
sem_unlink(env->me_txns->mti_rmname);
sem_unlink(env->me_txns->mti_wmname);
}
}
#endif
munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
}
if (env->me_lfd != INVALID_HANDLE_VALUE) {
#ifdef _WIN32
if (excl >= 0) {
// Unlock the lockfile. Windows would have unlocked it
// after closing anyway, but not necessarily at once.
UnlockFile(env->me_lfd, 0, 0, 1, 0);
}
#endif
(void) close(env->me_lfd);
}
env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY);
*/
}
func (db *DB) copyfd(handle int) error {
/*
MDB_txn *txn = NULL;
int rc;
size_t wsize;
char *ptr;
#ifdef _WIN32
DWORD len, w2;
#define DO_WRITE(rc, fd, ptr, w2, len) rc = WriteFile(fd, ptr, w2, &len, NULL)
#else
ssize_t len;
size_t w2;
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif
// Do the lock/unlock of the reader mutex before starting the
// write txn. Otherwise other read txns could block writers.
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn);
if (rc)
return rc;
if (env->me_txns) {
// We must start the actual read txn after blocking writers
mdb_txn_reset0(txn, "reset-stage1");
// Temporarily block writers until we snapshot the meta pages
LOCK_MUTEX_W(env);
rc = mdb_txn_renew0(txn);
if (rc) {
UNLOCK_MUTEX_W(env);
goto leave;
}
}
wsize = env->me_psize * 2;
ptr = env->me_map;
w2 = wsize;
while (w2 > 0) {
DO_WRITE(rc, fd, ptr, w2, len);
if (!rc) {
rc = ErrCode();
break;
} else if (len > 0) {
rc = MDB_SUCCESS;
ptr += len;
w2 -= len;
continue;
} else {
// Non-blocking or async handles are not supported
rc = EIO;
break;
}
}
if (env->me_txns)
UNLOCK_MUTEX_W(env);
if (rc)
goto leave;
wsize = txn->mt_next_pgno * env->me_psize - wsize;
while (wsize > 0) {
if (wsize > MAX_WRITE)
w2 = MAX_WRITE;
else
w2 = wsize;
DO_WRITE(rc, fd, ptr, w2, len);
if (!rc) {
rc = ErrCode();
break;
} else if (len > 0) {
rc = MDB_SUCCESS;
ptr += len;
wsize -= len;
continue;
} else {
rc = EIO;
break;
}
}
leave:
mdb_txn_abort(txn);
return rc;
}
int
mdb_env_copy(MDB_env *env, const char *path)
{
int rc, len;
char *lpath;
HANDLE newfd = INVALID_HANDLE_VALUE;
if (env->me_flags & MDB_NOSUBDIR) {
lpath = (char *)path;
} else {
len = strlen(path);
len += sizeof(DATANAME);
lpath = malloc(len);
if (!lpath)
return ENOMEM;
sprintf(lpath, "%s" DATANAME, path);
}
// The destination path must exist, but the destination file must not.
// We don't want the OS to cache the writes, since the source data is
// already in the OS cache.
#ifdef _WIN32
newfd = CreateFile(lpath, GENERIC_WRITE, 0, NULL, CREATE_NEW,
FILE_FLAG_NO_BUFFERING|FILE_FLAG_WRITE_THROUGH, NULL);
#else
newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL, 0666);
#endif
if (newfd == INVALID_HANDLE_VALUE) {
rc = ErrCode();
goto leave;
}
#ifdef O_DIRECT
// Set O_DIRECT if the file system supports it
if ((rc = fcntl(newfd, F_GETFL)) != -1)
(void) fcntl(newfd, F_SETFL, rc | O_DIRECT);
#endif
#ifdef F_NOCACHE // __APPLE__
rc = fcntl(newfd, F_NOCACHE, 1);
if (rc) {
rc = ErrCode();
goto leave;
}
#endif
rc = mdb_env_copyfd(env, newfd);
leave:
if (!(env->me_flags & MDB_NOSUBDIR))
free(lpath);
if (newfd != INVALID_HANDLE_VALUE)
if (close(newfd) < 0 && rc == MDB_SUCCESS)
rc = ErrCode();
return rc;
*/
return nil
}
func (db *DB) Close() {
/*
MDB_page *dp;
if (env == NULL)
return;
VGMEMP_DESTROY(env);
while ((dp = env->me_dpages) != NULL) {
VGMEMP_DEFINED(&dp->mp_next, sizeof(dp->mp_next));
env->me_dpages = dp->mp_next;
free(dp);
}
mdb_env_close0(env, 0);
free(env);
*/
}
// Calculate the size of a leaf node.
// The size depends on the environment's page size; if a data item
// is too large it will be put onto an overflow page and the node
// size will only include the key and not the data. Sizes are always
// rounded up to an even number of bytes, to guarantee 2-byte alignment
// of the #MDB_node headers.
// @param[in] env The environment handle.
// @param[in] key The key for the node.
// @param[in] data The data for the node.
// @return The number of bytes needed to store the node.
func (db *DB) LeafSize(key []byte, data []byte) int {
/*
size_t sz;
sz = LEAFSIZE(key, data);
if (sz > env->me_nodemax) {
// put on overflow page
sz -= data->mv_size - sizeof(pgno_t);
}
return EVEN(sz + sizeof(indx_t));
*/
return 0
}
// Calculate the size of a branch node.
// The size should depend on the environment's page size but since
// we currently don't support spilling large keys onto overflow
// pages, it's simply the size of the #MDB_node header plus the
// size of the key. Sizes are always rounded up to an even number
// of bytes, to guarantee 2-byte alignment of the #MDB_node headers.
// @param[in] env The environment handle.
// @param[in] key The key for the node.
// @return The number of bytes needed to store the node.
func (db *DB) BranchSize(key []byte) int {
/*
size_t sz;
sz = INDXSIZE(key);
if (sz > env->me_nodemax) {
// put on overflow page
// not implemented
// sz -= key->size - sizeof(pgno_t);
}
return sz + sizeof(indx_t);
*/
return 0
}
func (db *DB) SetFlags(flag int, onoff bool) error {
/*
if ((flag & CHANGEABLE) != flag)
return EINVAL;
if (onoff)
env->me_flags |= flag;
else
env->me_flags &= ~flag;
return MDB_SUCCESS;
*/
}
return nil
}
func (db *DB) Stat() *stat {
/*
int toggle;
if (env == NULL || arg == NULL)
return EINVAL;
toggle = mdb_env_pick_meta(env);
stat := &Stat{}
stat->ms_psize = env->me_psize;
stat->ms_depth = db->md_depth;
stat->ms_branch_pages = db->md_branch_pages;
stat->ms_leaf_pages = db->md_leaf_pages;
stat->ms_overflow_pages = db->md_overflow_pages;
stat->ms_entries = db->md_entries;
//return mdb_stat0(env, &env->me_metas[toggle]->mm_dbs[MAIN_DBI], stat);
return stat
*/
// TODO: Calculate size, depth, page count (by type), entry count, readers, etc.
return nil
}
func (db *DB) Info() *Info {
/*
int toggle;
if (env == NULL || arg == NULL)
return EINVAL;
toggle = mdb_env_pick_meta(env);
arg->me_mapaddr = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : 0;
arg->me_mapsize = env->me_mapsize;
arg->me_maxreaders = env->me_maxreaders;
// me_numreaders may be zero if this process never used any readers. Use
// the shared numreader count if it exists.
arg->me_numreaders = env->me_txns ? env->me_txns->mti_numreaders : env->me_numreaders;
arg->me_last_pgno = env->me_metas[toggle]->mm_last_pg;
arg->me_last_txnid = env->me_metas[toggle]->mm_txnid;
return MDB_SUCCESS;
*/
return nil
}
// TODO: Move to bucket.go
func (db *DB) CloseBucket(b Bucket) {
/*
char *ptr;
if (dbi <= MAIN_DBI || dbi >= env->me_maxdbs)
return;
ptr = env->me_dbxs[dbi].md_name.mv_data;
env->me_dbxs[dbi].md_name.mv_data = NULL;
env->me_dbxs[dbi].md_name.mv_size = 0;
env->me_dbflags[dbi] = 0;
free(ptr);
*/
}
//int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx)
func (db *DB) getReaderList() error {
/*
unsigned int i, rdrs;
MDB_reader *mr;
char buf[64];
int rc = 0, first = 1;
if (!env || !func)
return -1;
if (!env->me_txns) {
return func("(no reader locks)\n", ctx);
}
rdrs = env->me_txns->mti_numreaders;
mr = env->me_txns->mti_readers;
for (i=0; i<rdrs; i++) {
if (mr[i].mr_pid) {
txnid_t txnid = mr[i].mr_txnid;
sprintf(buf, txnid == (txnid_t)-1 ?
"%10d %"Z"x -\n" : "%10d %"Z"x %"Z"u\n",
(int)mr[i].mr_pid, (size_t)mr[i].mr_tid, txnid);
if (first) {
first = 0;
rc = func(" pid thread txnid\n", ctx);
if (rc < 0)
break;
}
rc = func(buf, ctx);
if (rc < 0)
break;
}
}
if (first) {
rc = func("(no active readers)\n", ctx);
}
return rc;
*/
return nil
}
// (bool return is whether reader is dead)
func (db *DB) checkReaders() (bool, error) {
/*
unsigned int i, j, rdrs;
MDB_reader *mr;
MDB_PID_T *pids, pid;
int count = 0;
if (!env)
return EINVAL;
if (dead)
*dead = 0;
if (!env->me_txns)
return MDB_SUCCESS;
rdrs = env->me_txns->mti_numreaders;
pids = malloc((rdrs+1) * sizeof(MDB_PID_T));
if (!pids)
return ENOMEM;
pids[0] = 0;
mr = env->me_txns->mti_readers;
for (i=0; i<rdrs; i++) {
if (mr[i].mr_pid && mr[i].mr_pid != env->me_pid) {
pid = mr[i].mr_pid;
if (mdb_pid_insert(pids, pid) == 0) {
if (!mdb_reader_pid(env, Pidcheck, pid)) {
LOCK_MUTEX_R(env);
// Recheck, a new process may have reused pid
if (!mdb_reader_pid(env, Pidcheck, pid)) {
for (j=i; j<rdrs; j++)
if (mr[j].mr_pid == pid) {
DPRINTF(("clear stale reader pid %u txn %"Z"d",
(unsigned) pid, mr[j].mr_txnid));
mr[j].mr_pid = 0;
count++;
}
}
UNLOCK_MUTEX_R(env);
}
}
}
}
free(pids);
if (dead)
*dead = count;
return MDB_SUCCESS;
*/
return false, nil
func (db *DB) minTxnID() txnid {
// TODO: Find the oldest transaction id.
return 0
}

10
node.go
View File

@ -28,3 +28,13 @@ type branchNode struct {
func (n *leafNode) key() []byte {
return (*[MaxKeySize]byte)(unsafe.Pointer(&n.data))[:n.keySize]
}
func leafNodeSize(key []byte, data []byte) int {
// TODO: Return even(sizeof(node) + len(key) + len(data))
return 0
}
func branchNodeSize(key []byte) int {
// TODO: Return even(sizeof(node) + len(key))
return 0
}

View File

@ -20,8 +20,7 @@ const (
p_leaf = 0x02
p_overflow = 0x04
p_meta = 0x08
p_dirty = 0x10 /**< dirty page, also set for #P_SUBP pages */
p_keep = 0x8000 /**< leave this page alone during spill */
p_dirty = 0x10 /**< dirty page, also set for #P_SUBP pages */
p_invalid = ^pgno(0)
)

View File

@ -73,334 +73,334 @@ func (c *RWCursor) Put(key []byte, value []byte) error {
/*
insert = rc;
if (insert) {
// The key does not exist
DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top]));
if ((mc->mc_db->md_flags & MDB_DUPSORT) &&
LEAFSIZE(key, data) > env->me_nodemax)
{
// Too big for a node, insert in sub-DB
fp_flags = P_LEAF|P_DIRTY;
fp = env->me_pbuf;
fp->mp_pad = data->mv_size; // used if MDB_DUPFIXED
fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ;
goto prep_subDB;
}
} else {
more:
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
olddata.mv_size = NODEDSZ(leaf);
olddata.mv_data = NODEDATA(leaf);
// DB has dups?
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
// Prepare (sub-)page/sub-DB to accept the new item,
// if needed. fp: old sub-page or a header faking
// it. mp: new (sub-)page. offset: growth in page
// size. xdata: node data with new page or DB.
ssize_t i, offset = 0;
mp = fp = xdata.mv_data = env->me_pbuf;
mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
// Was a single item before, must convert now
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
// Just overwrite the current item
if (flags == MDB_CURRENT)
goto current;
#if UINT_MAX < SIZE_MAX
if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t))
#ifdef MISALIGNED_OK
mc->mc_dbx->md_dcmp = mdb_cmp_long;
#else
mc->mc_dbx->md_dcmp = mdb_cmp_cint;
#endif
#endif
// if data matches, skip it
if (!mc->mc_dbx->md_dcmp(data, &olddata)) {
if (flags & MDB_NODUPDATA)
rc = MDB_KEYEXIST;
else if (flags & MDB_MULTIPLE)
goto next_mult;
else
rc = MDB_SUCCESS;
return rc;
}
// Back up original data item
dkey.mv_size = olddata.mv_size;
dkey.mv_data = memcpy(fp+1, olddata.mv_data, olddata.mv_size);
// Make sub-page header for the dup items, with dummy body
fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
fp->mp_lower = PAGEHDRSZ;
xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
fp->mp_flags |= P_LEAF2;
fp->mp_pad = data->mv_size;
xdata.mv_size += 2 * data->mv_size; // leave space for 2 more
} else {
xdata.mv_size += 2 * (sizeof(indx_t) + NODESIZE) +
(dkey.mv_size & 1) + (data->mv_size & 1);
}
fp->mp_upper = xdata.mv_size;
olddata.mv_size = fp->mp_upper; // pretend olddata is fp
} else if (leaf->mn_flags & F_SUBDATA) {
// Data is on sub-DB, just store it
flags |= F_DUPDATA|F_SUBDATA;
goto put_sub;
} else {
// Data is on sub-page
fp = olddata.mv_data;
switch (flags) {
default:
i = -(ssize_t)SIZELEFT(fp);
if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) {
offset = i += (ssize_t) EVEN(
sizeof(indx_t) + NODESIZE + data->mv_size);
} else {
i += offset = fp->mp_pad;
offset *= 4; // space for 4 more
}
if (i > 0)
break;
// FALLTHRU: Sub-page is big enough
case MDB_CURRENT:
fp->mp_flags |= P_DIRTY;
COPY_PGNO(fp->mp_pgno, mp->mp_pgno);
mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
flags |= F_DUPDATA;
goto put_sub;
}
xdata.mv_size = olddata.mv_size + offset;
insert = rc;
if (insert) {
// The key does not exist
DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top]));
if ((mc->mc_db->md_flags & MDB_DUPSORT) &&
LEAFSIZE(key, data) > env->me_nodemax)
{
// Too big for a node, insert in sub-DB
fp_flags = P_LEAF|P_DIRTY;
fp = env->me_pbuf;
fp->mp_pad = data->mv_size; // used if MDB_DUPFIXED
fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ;
goto prep_subDB;
}
} else {
fp_flags = fp->mp_flags;
if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) {
// Too big for a sub-page, convert to sub-DB
fp_flags &= ~P_SUBP;
prep_subDB:
dummy.md_pad = 0;
dummy.md_flags = 0;
dummy.md_depth = 1;
dummy.md_branch_pages = 0;
dummy.md_leaf_pages = 1;
dummy.md_overflow_pages = 0;
dummy.md_entries = NUMKEYS(fp);
xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy;
if ((rc = mdb_page_alloc(mc, 1, &mp)))
return rc;
offset = env->me_psize - olddata.mv_size;
flags |= F_DUPDATA|F_SUBDATA;
dummy.md_root = mp->mp_pgno;
}
if (mp != fp) {
mp->mp_flags = fp_flags | P_DIRTY;
mp->mp_pad = fp->mp_pad;
mp->mp_lower = fp->mp_lower;
mp->mp_upper = fp->mp_upper + offset;
if (fp_flags & P_LEAF2) {
memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
} else {
memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper,
olddata.mv_size - fp->mp_upper);
for (i = NUMKEYS(fp); --i >= 0; )
mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
}
}
rdata = &xdata;
flags |= F_DUPDATA;
do_sub = 1;
if (!insert)
mdb_node_del(mc, 0);
goto new_sub;
}
current:
// overflow page overwrites need special handling
if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
MDB_page *omp;
pgno_t pg;
int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize);
memcpy(&pg, olddata.mv_data, sizeof(pg));
if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0)
return rc2;
ovpages = omp->mp_pages;
// Is the ov page large enough?
if (ovpages >= dpages) {
if (!(omp->mp_flags & P_DIRTY) &&
(level || (env->me_flags & MDB_WRITEMAP)))
{
rc = mdb_page_unspill(mc->mc_txn, omp, &omp);
if (rc)
return rc;
level = 0; // dirty in this txn or clean
}
// Is it dirty?
if (omp->mp_flags & P_DIRTY) {
// yes, overwrite it. Note in this case we don't
// bother to try shrinking the page if the new data
// is smaller than the overflow threshold.
if (level > 1) {
// It is writable only in a parent txn
size_t sz = (size_t) env->me_psize * ovpages, off;
MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages);
MDB_ID2 id2;
if (!np)
return ENOMEM;
id2.mid = pg;
id2.mptr = np;
rc = mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2);
mdb_cassert(mc, rc == 0);
if (!(flags & MDB_RESERVE)) {
// Copy end of page, adjusting alignment so
// compiler may copy words instead of bytes.
off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t);
memcpy((size_t *)((char *)np + off),
(size_t *)((char *)omp + off), sz - off);
sz = PAGEHDRSZ;
}
memcpy(np, omp, sz); // Copy beginning of page
omp = np;
}
SETDSZ(leaf, data->mv_size);
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = METADATA(omp);
else
memcpy(METADATA(omp), data->mv_data, data->mv_size);
goto done;
}
}
if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS)
return rc2;
} else if (data->mv_size == olddata.mv_size) {
// same size, just replace it. Note that we could
// also reuse this node if the new data is smaller,
// but instead we opt to shrink the node in that case.
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = olddata.mv_data;
else if (data->mv_size)
memcpy(olddata.mv_data, data->mv_data, data->mv_size);
else
memcpy(NODEKEY(leaf), key->mv_data, key->mv_size);
goto done;
}
mdb_node_del(mc, 0);
mc->mc_db->md_entries--;
}
rdata = data;
new_sub:
nflags = flags & NODE_ADD_FLAGS;
nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata);
if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA )
nflags &= ~MDB_APPEND;
if (!insert)
nflags |= MDB_SPLIT_REPLACE;
rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
} else {
// There is room already in this leaf page.
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
if (rc == 0 && !do_sub && insert) {
// Adjust other cursors pointing to mp
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (mc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (m3 == mc || m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) {
m3->mc_ki[i]++;
}
}
}
}
if (rc != MDB_SUCCESS)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
else {
// Now store the actual data in the child DB. Note that we're
// storing the user data in the keys field, so there are strict
// size limits on dupdata. The actual data fields of the child
// DB are all zero size.
if (do_sub) {
int xflags;
put_sub:
xdata.mv_size = 0;
xdata.mv_data = "";
more:
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (flags & MDB_CURRENT) {
xflags = MDB_CURRENT|MDB_NOSPILL;
} else {
mdb_xcursor_init1(mc, leaf);
xflags = (flags & MDB_NODUPDATA) ?
MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL;
}
// converted, write the original data first
if (dkey.mv_size) {
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc)
return rc;
{
// Adjust other cursors pointing to mp
MDB_cursor *m2;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
olddata.mv_size = NODEDSZ(leaf);
olddata.mv_data = NODEDATA(leaf);
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
if (!(m2->mc_flags & C_INITIALIZED)) continue;
if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) {
mdb_xcursor_init1(m2, leaf);
// DB has dups?
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
// Prepare (sub-)page/sub-DB to accept the new item,
// if needed. fp: old sub-page or a header faking
// it. mp: new (sub-)page. offset: growth in page
// size. xdata: node data with new page or DB.
ssize_t i, offset = 0;
mp = fp = xdata.mv_data = env->me_pbuf;
mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
// Was a single item before, must convert now
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
// Just overwrite the current item
if (flags == MDB_CURRENT)
goto current;
#if UINT_MAX < SIZE_MAX
if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t))
#ifdef MISALIGNED_OK
mc->mc_dbx->md_dcmp = mdb_cmp_long;
#else
mc->mc_dbx->md_dcmp = mdb_cmp_cint;
#endif
#endif
// if data matches, skip it
if (!mc->mc_dbx->md_dcmp(data, &olddata)) {
if (flags & MDB_NODUPDATA)
rc = MDB_KEYEXIST;
else if (flags & MDB_MULTIPLE)
goto next_mult;
else
rc = MDB_SUCCESS;
return rc;
}
// Back up original data item
dkey.mv_size = olddata.mv_size;
dkey.mv_data = memcpy(fp+1, olddata.mv_data, olddata.mv_size);
// Make sub-page header for the dup items, with dummy body
fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
fp->mp_lower = PAGEHDRSZ;
xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
fp->mp_flags |= P_LEAF2;
fp->mp_pad = data->mv_size;
xdata.mv_size += 2 * data->mv_size; // leave space for 2 more
} else {
xdata.mv_size += 2 * (sizeof(indx_t) + NODESIZE) +
(dkey.mv_size & 1) + (data->mv_size & 1);
}
fp->mp_upper = xdata.mv_size;
olddata.mv_size = fp->mp_upper; // pretend olddata is fp
} else if (leaf->mn_flags & F_SUBDATA) {
// Data is on sub-DB, just store it
flags |= F_DUPDATA|F_SUBDATA;
goto put_sub;
} else {
// Data is on sub-page
fp = olddata.mv_data;
switch (flags) {
default:
i = -(ssize_t)SIZELEFT(fp);
if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) {
offset = i += (ssize_t) EVEN(
sizeof(indx_t) + NODESIZE + data->mv_size);
} else {
i += offset = fp->mp_pad;
offset *= 4; // space for 4 more
}
if (i > 0)
break;
// FALLTHRU: Sub-page is big enough
case MDB_CURRENT:
fp->mp_flags |= P_DIRTY;
COPY_PGNO(fp->mp_pgno, mp->mp_pgno);
mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
flags |= F_DUPDATA;
goto put_sub;
}
xdata.mv_size = olddata.mv_size + offset;
}
fp_flags = fp->mp_flags;
if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) {
// Too big for a sub-page, convert to sub-DB
fp_flags &= ~P_SUBP;
prep_subDB:
dummy.md_pad = 0;
dummy.md_flags = 0;
dummy.md_depth = 1;
dummy.md_branch_pages = 0;
dummy.md_leaf_pages = 1;
dummy.md_overflow_pages = 0;
dummy.md_entries = NUMKEYS(fp);
xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy;
if ((rc = mdb_page_alloc(mc, 1, &mp)))
return rc;
offset = env->me_psize - olddata.mv_size;
flags |= F_DUPDATA|F_SUBDATA;
dummy.md_root = mp->mp_pgno;
}
if (mp != fp) {
mp->mp_flags = fp_flags | P_DIRTY;
mp->mp_pad = fp->mp_pad;
mp->mp_lower = fp->mp_lower;
mp->mp_upper = fp->mp_upper + offset;
if (fp_flags & P_LEAF2) {
memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
} else {
memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper,
olddata.mv_size - fp->mp_upper);
for (i = NUMKEYS(fp); --i >= 0; )
mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
}
}
rdata = &xdata;
flags |= F_DUPDATA;
do_sub = 1;
if (!insert)
mdb_node_del(mc, 0);
goto new_sub;
}
current:
// overflow page overwrites need special handling
if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
MDB_page *omp;
pgno_t pg;
int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize);
memcpy(&pg, olddata.mv_data, sizeof(pg));
if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0)
return rc2;
ovpages = omp->mp_pages;
// Is the ov page large enough?
if (ovpages >= dpages) {
if (!(omp->mp_flags & P_DIRTY) &&
(level || (env->me_flags & MDB_WRITEMAP)))
{
rc = mdb_page_unspill(mc->mc_txn, omp, &omp);
if (rc)
return rc;
level = 0; // dirty in this txn or clean
}
// Is it dirty?
if (omp->mp_flags & P_DIRTY) {
// yes, overwrite it. Note in this case we don't
// bother to try shrinking the page if the new data
// is smaller than the overflow threshold.
if (level > 1) {
// It is writable only in a parent txn
size_t sz = (size_t) env->me_psize * ovpages, off;
MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages);
MDB_ID2 id2;
if (!np)
return ENOMEM;
id2.mid = pg;
id2.mptr = np;
rc = mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2);
mdb_cassert(mc, rc == 0);
if (!(flags & MDB_RESERVE)) {
// Copy end of page, adjusting alignment so
// compiler may copy words instead of bytes.
off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t);
memcpy((size_t *)((char *)np + off),
(size_t *)((char *)omp + off), sz - off);
sz = PAGEHDRSZ;
}
memcpy(np, omp, sz); // Copy beginning of page
omp = np;
}
SETDSZ(leaf, data->mv_size);
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = METADATA(omp);
else
memcpy(METADATA(omp), data->mv_data, data->mv_size);
goto done;
}
}
if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS)
return rc2;
} else if (data->mv_size == olddata.mv_size) {
// same size, just replace it. Note that we could
// also reuse this node if the new data is smaller,
// but instead we opt to shrink the node in that case.
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = olddata.mv_data;
else if (data->mv_size)
memcpy(olddata.mv_data, data->mv_data, data->mv_size);
else
memcpy(NODEKEY(leaf), key->mv_data, key->mv_size);
goto done;
}
mdb_node_del(mc, 0);
mc->mc_db->md_entries--;
}
rdata = data;
new_sub:
nflags = flags & NODE_ADD_FLAGS;
nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata);
if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA )
nflags &= ~MDB_APPEND;
if (!insert)
nflags |= MDB_SPLIT_REPLACE;
rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
} else {
// There is room already in this leaf page.
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
if (rc == 0 && !do_sub && insert) {
// Adjust other cursors pointing to mp
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (mc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (m3 == mc || m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) {
m3->mc_ki[i]++;
}
}
}
}
if (rc != MDB_SUCCESS)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
else {
// Now store the actual data in the child DB. Note that we're
// storing the user data in the keys field, so there are strict
// size limits on dupdata. The actual data fields of the child
// DB are all zero size.
if (do_sub) {
int xflags;
put_sub:
xdata.mv_size = 0;
xdata.mv_data = "";
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (flags & MDB_CURRENT) {
xflags = MDB_CURRENT|MDB_NOSPILL;
} else {
mdb_xcursor_init1(mc, leaf);
xflags = (flags & MDB_NODUPDATA) ?
MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL;
}
// converted, write the original data first
if (dkey.mv_size) {
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc)
return rc;
{
// Adjust other cursors pointing to mp
MDB_cursor *m2;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
if (!(m2->mc_flags & C_INITIALIZED)) continue;
if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) {
mdb_xcursor_init1(m2, leaf);
}
}
}
// we've done our job
dkey.mv_size = 0;
}
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
}
// we've done our job
dkey.mv_size = 0;
}
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
}
}
// sub-writes might have failed so check rc again.
// Don't increment count if we just replaced an existing item.
if (!rc && !(flags & MDB_CURRENT))
mc->mc_db->md_entries++;
if (flags & MDB_MULTIPLE) {
if (!rc) {
next_mult:
mcount++;
// let caller know how many succeeded, if any
data[1].mv_size = mcount;
if (mcount < dcount) {
data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size;
goto more;
// sub-writes might have failed so check rc again.
// Don't increment count if we just replaced an existing item.
if (!rc && !(flags & MDB_CURRENT))
mc->mc_db->md_entries++;
if (flags & MDB_MULTIPLE) {
if (!rc) {
next_mult:
mcount++;
// let caller know how many succeeded, if any
data[1].mv_size = mcount;
if (mcount < dcount) {
data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size;
goto more;
}
}
}
}
}
done:
// If we succeeded and the key didn't exist before, make sure
// the cursor is marked valid.
if (!rc && insert)
mc->mc_flags |= C_INITIALIZED;
return rc;
done:
// If we succeeded and the key didn't exist before, make sure
// the cursor is marked valid.
if (!rc && insert)
mc->mc_flags |= C_INITIALIZED;
return rc;
*/
return nil
}

View File

@ -4,4 +4,126 @@ package bolt
// Only one read/write transaction can be active for a DB at a time.
type RWTransaction struct {
Transaction
dirtyPages map[int]*page
freePages map[int]*page
}
// TODO: Allocate scratch meta page.
// TODO: Allocate scratch data pages.
// TODO: Track dirty pages (?)
func (t *RWTransaction) Commit() error {
// TODO: Update non-system bucket pointers.
// TODO: Save freelist.
// TODO: Flush data.
// TODO: Initialize new meta object, Update system bucket nodes, last pgno, txnid.
// meta.mm_dbs[0] = txn->mt_dbs[0];
// meta.mm_dbs[1] = txn->mt_dbs[1];
// meta.mm_last_pg = txn->mt_next_pgno - 1;
// meta.mm_txnid = txn->mt_txnid;
// TODO: Pick sync or async file descriptor.
// TODO: Write meta page to file.
// TODO(?): Write checksum at the end.
return nil
}
func (t *RWTransaction) Rollback() error {
return t.close()
}
func (t *RWTransaction) close() error {
// TODO: Free scratch pages.
// TODO: Release writer lock.
return nil
}
// CreateBucket creates a new bucket.
func (t *RWTransaction) CreateBucket(name string, dupsort bool) (*Bucket, error) {
if t.db == nil {
return nil, InvalidTransactionError
}
// Check if bucket already exists.
if b := t.buckets[name]; b != nil {
return nil, &Error{"bucket already exists", nil}
}
// Create a new bucket entry.
var buf [unsafe.Sizeof(bucket{})]byte
var raw = (*bucket)(unsafe.Pointer(&buf[0]))
raw.root = p_invalid
// TODO: Set dupsort flag.
// Open cursor to system bucket.
c, err := t.Cursor(&t.sysbuckets)
if err != nil {
return nil, err
}
// Put new entry into system bucket.
if err := c.Put([]byte(name), buf[:]); err != nil {
return nil, err
}
// Save reference to bucket.
b := &Bucket{name: name, bucket: raw, isNew: true}
t.buckets[name] = b
// TODO: dbflag |= DB_DIRTY;
return b, nil
}
// DropBucket deletes a bucket.
func (t *RWTransaction) DeleteBucket(b *Bucket) error {
// TODO: Find bucket.
// TODO: Remove entry from system bucket.
return nil
}
// Put sets the value for a key in a given bucket.
func (t *Transaction) Put(name string, key []byte, value []byte) error {
c, err := t.Cursor(name)
if err != nil {
return nil, err
}
return c.Put(key, value)
}
// page returns a reference to the page with a given id.
// If page has been written to then a temporary bufferred page is returned.
func (t *Transaction) page(id int) *page {
// Check the dirty pages first.
if p, ok := t.pages[id]; ok {
return p
}
// Otherwise return directly from the mmap.
return t.Transaction.page(id)
}
// Flush (some) dirty pages to the map, after clearing their dirty flag.
// @param[in] txn the transaction that's being committed
// @param[in] keep number of initial pages in dirty_list to keep dirty.
// @return 0 on success, non-zero on failure.
func (t *Transaction) flush(keep bool) error {
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
// TODO: Loop over each dirty page and write it to disk.
return nil
}
func (t *RWTransaction) DeleteBucket(name string) error {
// TODO: Remove from main DB.
// TODO: Delete entry from system bucket.
// TODO: Free all pages.
// TODO: Remove cursor.
return nil
}

File diff suppressed because it is too large Load Diff