// btree version 2d
#define _FILE_OFFSET_BITS 64
#define _LARGEFILE64_SOURCE
#ifdef linux
#define _GNU_SOURCE
#endif
#ifdef unix
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <errno.h>
#else
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <fcntl.h>
#endif
#ifdef unix
typedef unsigned long long uid;
#else
typedef unsigned __int64 uid;
typedef unsigned __int64 off64_t;
typedef unsigned short ushort;
typedef unsigned int uint;
#endif
#define BT_ro 0x6f72 // ro
#define BT_rw 0x7772 // rw
#define BT_fl 0x6c66 // fl
#define BT_maxpage 32768 // maximum page size
#define BT_minpage 512 // minimum page size
#define BT_hashsize 512 // size of hash index for page cache
#define BT_hashmax 1024 // size of page cache
#define BT_hashprime 8191 // prime number for hashing
typedef enum{
BtLockRead = 0,
BtLockWrite
}BtLock;
typedef struct {
unsigned char id[6]; // id associated with key
ushort off:15; // page offset for key start
ushort dead:1; // set for deleted key
uint tod; // time-stamp for key
} BtSlot;
typedef struct {
unsigned char len;
unsigned char key[1];
} *BtKey;
typedef struct {
ushort cnt; // count of keys in page
unsigned char lvl:4; // level of page
unsigned char bits:4; // page size in bits
unsigned char filler; // unused
unsigned char right[6]; // page number to right
BtSlot keys[2]; // nxt avail/array of key offsets in page
} *BtPage;
typedef struct {
BtPage page; // mapped page pointer
uid page_no; // mapped page number
void *lruprev; // least recently used previous cache block
void *lrunext; // lru next cache block
void *hashprev; // previous cache block for the same hash idx
void *hashnext; // next cache block for the same hash idx
#ifndef unix
HANDLE hmap;
#endif
}BtHash;
typedef struct _BtDb {
int page_bits; // each page size
uid page_no; // current page number
int err;
#ifdef unix
int idx;
#else
HANDLE idx;
HANDLE hmap;
#endif
unsigned char *mem; // frame, cursor, page memory buffer
ushort cache[BT_hashsize]; // hash index for cache
BtHash nodes[BT_hashmax+1]; // page cache
BtHash *lrufirst; // lru list head
BtHash *lrulast; // lru list tail
int nodecnt; // highest number of cache block in use
int hashmask; // number of pages in cache block - 1
uint mode; // read-write mode
BtPage alloc; // frame buffer for alloc page ( page 0 )
BtPage cursor; // cached frame for start/next
BtPage frame; // spare frame for the page split
BtPage page; // current page
} BtDb;
typedef enum {
BTERR_ok = 0,
BTERR_struct,
BTERR_ovflw,
BTERR_lock,
BTERR_map,
BTERR_wrt,
BTERR_hash
} BTERR;
// B-Tree functions
extern void bt_close (BtDb *bt);
extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len);
extern uint bt_findkey (BtDb *bt, unsigned char *key, uint len);
extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
extern uint bt_nextkey (BtDb *bt, uint slot);
extern BtKey bt_key (BtDb *bt, uint slot);
extern uid bt_uid (BtDb *bt, uint slot);
extern uint bt_tod (BtDb *bt, uint slot);
#define ALLOC_page 0
#define ROOT_page 1
#define MIN_lvl 2
// The page is allocated from low and hi ends.
// The key offsets and row-id's are allocated
// from the bottom, while the text of the key
// is allocated from the top. When the two
// areas meet, the page is split into two.
// A key consists of a length byte, two bytes of
// index number (0 - 65534), and up to 253 bytes
// of key value. Duplicate keys are discarded.
// Associated with each key is a 48 bit row-id.
// The b-tree root is always located at page 1.
// The first leaf page of level zero is always
// located on page 2.
// When to root page fills, it is split in two and
// the tree height is raised by a new root at page
// one with two keys.
// Deleted leaf keys are marked with a dead bit, but
// are not removed until the page is cleaned.
// Pages are never deleted.
// Under unix, groups of pages from the btree are optionally
// cached with memory mapping. A hash table is used to keep
// track of the cached pages. This behaviour is controlled
// by the cache block size parameter to bt_open.
// To achieve maximum concurrency one page is locked at a time
// as the tree is traversed to find leaf key in question. The right
// page numbers are used in cases where the page is being split.
// Read locks are acquired only under Memory Mapping.
// Page 0 is dedicated to lock for new page extensions.
#define keyptr(slot) ((BtKey)((char*)bt->page + bt->page->keys[slot].off))
void bt_putid(unsigned char *dest, uid id)
{
int i = 6;
while( i-- )
dest[i] = (unsigned char)id, id >>= 8;
}
uid bt_getid(unsigned char *src)
{
uid id = 0;
int i;
for( i = 0; i < 6; i++ )
id <<= 8, id |= *src++;
return id;
}
BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
{
off64_t off = page_no << bt->page_bits;
#ifdef unix
int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
struct flock lock[1];
#else
uint flags = LOCKFILE_FAIL_IMMEDIATELY;
OVERLAPPED ovl[1];
uint cnt = 5000;
#endif
#ifndef linux
if ( !bt->hashmask && mode != BtLockWrite )
return 0;
#endif
#ifdef unix
memset (lock, 0, sizeof(lock));
lock->l_start = off;
lock->l_type = mode == BtLockWrite ? F_WRLCK : F_RDLCK;
lock->l_len = sizeof(*bt->page);
lock->l_whence = 0;
if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
return bt->err = BTERR_lock;
return 0;
#else
memset (ovl, 0, sizeof(ovl));
ovl->OffsetHigh = (uint)(off >> 32);
ovl->Offset = (uint)off;
if( mode == BtLockWrite )
flags |= LOCKFILE_EXCLUSIVE_LOCK;
do if( LockFileEx (bt->idx, flags, 0, sizeof(*bt->page), 0L, ovl) )
return bt->err = 0;
else
Sleep (1);
while( cnt-- );
return bt->err = BTERR_lock;
#endif
}
// remove lock on first or second BtPage bytes
BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
{
off64_t off = page_no << bt->page_bits;
#ifdef unix
struct flock lock[1];
#else
OVERLAPPED ovl[1];
#endif
#ifndef linux
if ( !bt->hashmask && mode != BtLockWrite )
return 0;
#endif
#ifdef unix
memset (lock, 0, sizeof(lock));
lock->l_start = off;
lock->l_type = F_UNLCK;
lock->l_len = sizeof(*bt->page);
lock->l_whence = 0;
if( fcntl (bt->idx, F_SETLK, lock) < 0 )
return bt->err = BTERR_lock;
#else
memset (ovl, 0, sizeof(ovl));
ovl->OffsetHigh = (uint)(off >> 32);
ovl->Offset = (uint)off;
if( !UnlockFileEx (bt->idx, 0, sizeof(*bt->page), 0, ovl) )
return GetLastError(), bt->err = BTERR_lock;
#endif
return bt->err = 0;
}
// close and release memory
void bt_close (BtDb *bt)
{
BtHash *hash;
#ifdef unix
// release mapped pages
if( hash = bt->lrufirst )
do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
while(hash = hash->lrunext);
if ( bt->mem )
free (bt->mem);
close (bt->idx);
free (bt);
#else
if( hash = bt->lrufirst )
do
{
UnmapViewOfFile(hash->page);
CloseHandle(hash->hmap);
} while(hash = hash->lrunext);
if ( bt->mem)
VirtualFree (bt->mem, 0, MEM_RELEASE);
CloseHandle(bt->idx);
GlobalFree (bt);
#endif
}
// open/create new btree
// call with file_name, BT_openmode, bits in page size (e.g. 12),
// size of map cache-frame (e.g. 65536) or zero for no mapping.
BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk)
{
uint lvl, attr;
off64_t size;
uint amt[1];
BtKey key;
BtDb* bt;
BtLock lockmode = BtLockWrite;
#ifdef unix
int flag;
#else
SYSTEM_INFO sysinfo[1];
int cnt = 5000;
#endif
#ifdef unix
bt = malloc (sizeof(BtDb));
memset (bt, 0, sizeof(BtDb));
switch (mode & 0x7fff)
{
case BT_fl:
case BT_rw:
bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
break;
case BT_ro:
default:
bt->idx = open ((char*)name, O_RDONLY);
lockmode = BtLockRead;
break;
}
if( bt->idx == -1 )
return free(bt), bt=NULL;
#else
bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
attr = FILE_ATTRIBUTE_NORMAL;
switch (mode & 0x7fff)
{
case BT_fl:
attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
case BT_rw:
bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
break;
case BT_ro:
default:
bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
lockmode = BtLockRead;
break;
}
if( bt->idx == INVALID_HANDLE_VALUE )
return GlobalFree(bt), NULL;
// normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
GetSystemInfo(sysinfo);
if( cacheblk )
if( *amt = cacheblk % sysinfo->dwAllocationGranularity )
cacheblk += sysinfo->dwAllocationGranularity - *amt;
#endif
// determine sanity of page size
if( bits > 15 )
bits = 15;
else if( bits < 9 )
bits = 9;
bt->page_bits = bits;
bt->mode = mode;
if ( mode == BT_ro )
{
#ifdef unix
*amt = 0;
size = lseek (bt->idx, 0L, 2);
#else
size = GetFileSize(bt->idx, amt);
#endif
if ( !size && !*amt )
{
bt_close (bt);
return NULL;
}
}
if ( lockmode == BtLockWrite )
if ( bt_lockpage(bt, ALLOC_page, lockmode) )
{
bt_close (bt);
return NULL;
}
#ifdef unix
bt->mem = malloc (4 *BT_maxpage);
bt->frame = (BtPage)bt->mem;
bt->cursor = (BtPage)(bt->mem + BT_maxpage);
bt->page = (BtPage)(bt->mem + 2 * BT_maxpage);
bt->alloc = (BtPage)(bt->mem + 3 * BT_maxpage);
flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
if( size = lseek (bt->idx, 0L, 2) ) {
// read minimum page size to get root info
pread(bt->idx, bt->frame, BT_minpage, 0);
bt->page_bits = bt->frame->bits;
if ( cacheblk && (cacheblk < (4U << bt->page_bits)) )
cacheblk = 4U << bt->page_bits;
if( cacheblk )
bt->hashmask = (cacheblk >> bt->page_bits) - 1;
size >>= bt->page_bits;
// disallow mapping if file is not multiple of mapping
if( size & bt->hashmask )
bt->hashmask = 0;
if ( lockmode == BtLockWrite )
if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
{
bt_close (bt);
return NULL;
}
return bt;
}
#else
bt->mem = VirtualAlloc(NULL, 4 * BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
bt->frame = (BtPage)bt->mem;
bt->cursor = (BtPage)(bt->mem + BT_maxpage);
bt->page = (BtPage)(bt->mem + 2 * BT_maxpage);
bt->alloc = (BtPage)(bt->mem + 3 * BT_maxpage);
size = GetFileSize(bt->idx, amt);
if( size || *amt ) {
// read minimum page size to get page 0 info
do if( ReadFile(bt->idx, (char *)bt->frame, BT_minpage, amt, NULL) )
break;
else
Sleep (1);
while( --cnt );
if( !cnt || (*amt < BT_minpage) )
{
bt_close (bt);
return NULL;
}
bt->page_bits = bt->frame->bits;
if ( cacheblk && (cacheblk < (4U << bt->page_bits)) )
cacheblk = 4U << bt->page_bits;
if( cacheblk )
bt->hashmask = (cacheblk >> bt->page_bits) - 1;
size >>= bt->page_bits;
if( size & bt->hashmask )
bt->hashmask = 0;
if ( lockmode == BtLockWrite )
if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
{
bt_close (bt);
return NULL;
}
return bt;
}
#endif
// set page-cache mapping size in pages
if ( cacheblk && (cacheblk < (4U << bt->page_bits)) )
cacheblk = 4U << bt->page_bits;
if( cacheblk )
bt->hashmask = (cacheblk >> bt->page_bits) - 1;
// initializes an empty b-tree with root page and page of leaves
memset (bt->alloc, 0, (1 << bt->page_bits));
bt->alloc->bits = bt->page_bits;
bt_putid(bt->alloc->right, MIN_lvl+1);
#ifdef unix
if( write (bt->idx, bt->alloc, 1U << bt->page_bits) < (1U << bt->page_bits) )
bt->err = BTERR_wrt;
#else
WriteFile (bt->idx, (char *)bt->alloc, (1 << bt->page_bits), amt, NULL);
if( *amt < (1U << bt->page_bits) )
bt->err = BTERR_wrt;
#endif
memset (bt->frame, 0, (1 << bt->page_bits));
bt->frame->bits = bt->page_bits;
for( lvl=MIN_lvl; lvl--; ) {
bt_putid(bt->frame->keys[0].id, MIN_lvl - lvl); // current page number
bt->frame->keys[0].off = (1 << bt->page_bits) - 3;
bt->frame->keys[1].off = (1 << bt->page_bits) - 3;
bt_putid(bt->frame->keys[1].id, lvl ? MIN_lvl - lvl + 1 : 0); // next(lower) page number
bt->frame->lvl = lvl;
bt->frame->cnt = 1;
key = (BtKey)((unsigned char *)bt->frame + (1 << bt->page_bits) - 3);
key->len = 2; // create stopper key
key->key[0] = 0xff;
key->key[1] = 0xff;
#ifdef unix
if( write (bt->idx, bt->frame, 1 << bt->page_bits) < (1 << bt->page_bits) )
bt->err = BTERR_wrt;
#else
WriteFile (bt->idx, (char *)bt->frame, (1 << bt->page_bits), amt, NULL);
if( *amt < (1U << bt->page_bits) )
bt->err = BTERR_wrt;
#endif
}
// create empty page area by writing last page of first
// cache area (other pages are zeroed by O/S)
if( bt->hashmask ) {
memset(bt->frame, 0, 1 << bt->page_bits);
#ifdef unix
pwrite(bt->idx, bt->frame, 1 << bt->page_bits, bt->hashmask << bt->page_bits);
#else
SetFilePointer (bt->idx, bt->hashmask << bt->page_bits, NULL, FILE_BEGIN);
WriteFile (bt->idx, (char *)bt->frame, (1 << bt->page_bits), amt, NULL);
if( *amt < (1U << bt->page_bits) )
bt->err = BTERR_wrt;
#endif
}
bt_unlockpage(bt, ALLOC_page, lockmode);
if( bt->err )
return bt_close (bt), bt=NULL;
return bt;
}
// compare two keys, returning > 0, = 0, or < 0
// as the comparison value
int keycmp (BtKey key1, unsigned char *key2, uint len2)
{
uint len1 = key1->len;
int ans;
if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
return ans;
if( len1 > len2 )
return 1;
if( len1 < len2 )
return -1;
return 0;
}
// place write or read lock on first or second BtPage bytes
// of requested page_no.
// Update current page of btree
// Under unix, the pages are optionally mapped directly into memory
// and are updated immediately.
BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
{
off64_t off = page_no << bt->page_bits;
#ifdef unix
if ( !bt->hashmask )
if ( pwrite(bt->idx, page, 1 << bt->page_bits, off) != (1 << bt->page_bits) )
return bt->err = BTERR_wrt;
#else
uint amt[1];
if ( !bt->hashmask )
{
SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
WriteFile (bt->idx, (char *)page, (1 << bt->page_bits), amt, NULL);
if( *amt < (1U << bt->page_bits) )
return GetLastError(), bt->err = BTERR_wrt;
}
#endif
return 0;
}
// find page in cache
BtHash *bt_findhash(BtDb *bt, uid page_no)
{
BtHash *hash;
uint idx;
// compute cache block first page and hash idx
page_no &= ~bt->hashmask;
idx = (uint)(page_no * BT_hashprime % BT_hashsize);
if( bt->cache[idx] )
hash = bt->nodes + bt->cache[idx];
else
return NULL;
do if( hash->page_no == page_no )
break;
while(hash = hash->hashnext );
return hash;
}
// add page cache entry to hash index
void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
{
uint idx = (uint)((page_no & ~bt->hashmask) * BT_hashprime % BT_hashsize);
BtHash *hash;
if( bt->cache[idx] ) {
node->hashnext = hash = bt->nodes + bt->cache[idx];
hash->hashprev = node;
}
node->hashprev = NULL;
bt->cache[idx] = (ushort)(node - bt->nodes);
}
// remove cache entry from hash table
void bt_unlinkhash(BtDb *bt, BtHash *node)
{
uint idx = (uint)((node->page_no & ~bt->hashmask) * BT_hashprime % BT_hashsize);
BtHash *hash;
// unlink node
if( hash = node->hashprev )
hash->hashnext = node->hashnext;
else if( hash = node->hashnext )
bt->cache[idx] = (ushort)(hash - bt->nodes);
else
bt->cache[idx] = 0;
if( hash = node->hashnext )
hash->hashprev = node->hashprev;
}
// add cache page to lru chain and map pages
BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
{
int flag;
off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
BtHash *node;
memset(hash, 0, sizeof(BtHash));
hash->page_no = (page_no & ~bt->hashmask);
bt_linkhash(bt, hash, page_no);
if( node = hash->lrunext = bt->lrufirst )
node->lruprev = hash;
else
bt->lrulast = hash;
bt->lrufirst = hash;
#ifdef unix
flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
if( (int)hash->page == -1 )
return bt->err = BTERR_map, (BtPage)NULL;
#else
flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
hash->hmap = CreateFileMapping(bt->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
if( !hash->hmap )
return bt->err = BTERR_map, NULL;
flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
if( !hash->page )
return bt->err = BTERR_map, NULL;
#endif
return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
}
// find or place requested page in page-cache
// return memory address where page is located.
BtPage bt_hashpage(BtDb *bt, uid page_no)
{
BtHash *hash, *node, *next;
BtPage page;
// find page in cache and move to top of lru list
if( hash = bt_findhash(bt, page_no) ) {
page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
// swap node in lru list
if( node = hash->lruprev ) {
if( next = node->lrunext = hash->lrunext )
next->lruprev = node;
else
bt->lrulast = node;
if( next = hash->lrunext = bt->lrufirst )
next->lruprev = hash;
else
return bt->err = BTERR_hash, (BtPage)NULL;
hash->lruprev = NULL;
bt->lrufirst = hash;
}
return page;
}
// map pages and add to cache entry
if( bt->nodecnt < BT_hashmax ) {
hash = bt->nodes + ++bt->nodecnt;
return bt_linklru(bt, hash, page_no);
}
// hash table is already full, replace last lru entry from the cache
if( hash = bt->lrulast ) {
// unlink from lru list
if( node = bt->lrulast = hash->lruprev )
node->lrunext = NULL;
else
return bt->err = BTERR_hash, (BtPage)NULL;
#ifdef unix
munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
#else
UnmapViewOfFile(hash->page);
CloseHandle(hash->hmap);
#endif
// unlink from hash table
bt_unlinkhash(bt, hash);
// map and add to cache
return bt_linklru(bt, hash, page_no);
}
return bt->err = BTERR_hash, (BtPage)NULL;
}
// map a btree page onto current page
// and place a ro/wr lock on the page
BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
{
off64_t off = page_no << bt->page_bits;
#ifndef unix
int amt[1], cnt = 500;
#endif
if( bt->hashmask ) {
bt->err = 0;
*page = bt_hashpage(bt, page_no);
return bt->err;
}
#ifdef unix
if ( pread(bt->idx, *page, 1 << bt->page_bits, off) < (1 << bt->page_bits) )
return bt->err = BTERR_map;
#else
SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
do if( ReadFile(bt->idx, *page, 1 << bt->page_bits, amt, NULL) )
break;
else
Sleep (1);
while( --cnt );
if( !cnt || (*amt < (1 << bt->page_bits)) )
return bt->err = BTERR_map;
#endif
return 0;
}
// find slot in page for given key at a given level
int bt_findslot (BtDb *bt, unsigned char *key, uint len)
{
uint diff, higher = (int)bt->page->cnt + 1, low = 1, slot;
uint good = 0;
// low is the next candidate, higher is already
// tested as .ge. the given key, loop ends when they meet
while( diff = higher - low ) {
slot = low + ( diff >> 1 );
if( keycmp (keyptr(slot), key, len) < 0 )
low = slot + 1;
else
higher = slot, good++;
}
// return zero if last key < given key
return good ? higher : 0;
}
// find and load page at given level for given key
// leave page rd or wr locked as requested
int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
{
uint drill = 0xff, slot;
uid page_no = ROOT_page;
uint mode = 0;
// start at root of btree and drill down
do {
// determine lock mode of drill level
mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead;
bt->page_no = page_no;
if( bt_lockpage(bt, bt->page_no, mode) )
return 0;
if( bt_mappage (bt, &bt->page, page_no) )
return 0;
// re-read and re-lock root after determining actual level of root
if( bt->page->lvl != drill) {
if ( bt->page_no != ROOT_page )
return bt->err = BTERR_struct, 0;
drill = bt->page->lvl;
if( lock == BtLockWrite && drill == lvl )
if( bt_unlockpage(bt, page_no, mode) )
return 0;
else
continue;
}
// find key on page at this level
// and descend to requested level
if( slot = bt_findslot (bt, key, len) )
if( drill > lvl )
page_no = (uint)bt_getid(bt->page->keys[slot].id), drill--;
else
return slot;
// or slide right into next page
else
page_no = bt_getid(bt->page->right);
// release lock
if( bt_unlockpage(bt, bt->page_no, mode) )
return 0;
// break on end of right chain and return error
if( !page_no )
break;
} while( 1 );
bt->err = BTERR_struct;
return 0; // return error
}
// find and delete key on page by marking delete flag bit
BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
{
int dirty = 0;
BtKey nxt;
uint slot;
if( slot = bt_loadpage (bt, key, len, 0, BtLockWrite) )
nxt = keyptr(slot);
else
return bt->err;
// if key is found delete it, otherwise ignore request
if( !keycmp (nxt, key, len) )
if( bt->page->keys[slot].dead == 0 )
dirty = bt->page->keys[slot].dead = 1;
if ( dirty )
if ( bt_update(bt, bt->page, bt->page_no) )
return bt->err;
return bt_unlockpage(bt, bt->page_no, BtLockWrite);
}
// find key in leaf level and return row-id
uint bt_findkey (BtDb *bt, unsigned char *key, uint len)
{
uint slot;
BtKey nxt;
if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
nxt = keyptr(slot);
else
return 0;
// if key exists, return id
if( bt->page->keys[slot].dead || memcmp (nxt->key, key, len) )
slot = 0;
memcpy(bt->cursor, bt->page, (1 << bt->page_bits));
if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
return 0;
return slot;
}
void bt_cleanpage(BtDb *bt)
{
uint off = (1 << bt->page_bits);
BtPage page = bt->page;
uint cnt = 0, idx = 0;
uint max = page->cnt;
BtKey key;
memcpy (bt->frame, page, (1 << bt->page_bits));
// skip page info and set rest of page to zero
memset (page+1, 0, (1 << bt->page_bits) - sizeof(*page));
// try cleaning up page first
while( cnt++ < max ) {
if( cnt < max && bt->frame->keys[cnt].dead )
continue;
key = (BtKey)((unsigned char *)bt->frame + bt->frame->keys[cnt].off);
off -= key->len + 1;
memcpy ((unsigned char *)page + off, key, key->len + 1);
page->keys[++idx].dead = bt->frame->keys[cnt].dead;
memcpy(page->keys[idx].id, bt->frame->keys[cnt].id, sizeof(bt->frame->keys->id));
page->keys[idx].tod = bt->frame->keys[cnt].tod;
page->keys[idx].off = off;
}
memcpy(page->keys[0].id,bt->frame->keys[0].id, sizeof(bt->frame->keys->id));
page->keys[0].off = off;
page->cnt = idx;
}
// allocate a new page and write page into it
uid bt_newpage(BtDb *bt, BtPage page)
{
uid new_page;
char *pmap;
// lock page zero and allocate empty page
if ( bt_lockpage(bt, ALLOC_page, BtLockWrite) )
return 0;
if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
return 0;
new_page = bt_getid(bt->alloc->right);
bt_putid(bt->alloc->right, new_page+1);
if( !bt->hashmask ) {
bt_putid(page->keys[0].id, new_page);
if( bt_update(bt, bt->alloc, ALLOC_page) )
return 0; // don't unlock on error
if( bt_update(bt, page, new_page) )
return 0; //don't unlock on error
// unlock page zero
if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
return 0;
return new_page;
}
bt_putid(page->keys[0].id, new_page);
#ifdef unix
if ( pwrite(bt->idx, page, 1 << bt->page_bits, new_page << bt->page_bits) < (1 << bt->page_bits) )
return bt->err = BTERR_wrt, 0;
// write last page in the block
if ( (new_page & bt->hashmask) == 0 )
{
// use ALLOC_page buffer to write zeros
memset((bt->mem + 3 * BT_maxpage), 0, 1 << bt->page_bits);
if ( pwrite(bt->idx,bt->mem + 3 * BT_maxpage, 1 << bt->page_bits, (new_page | bt->hashmask) << bt->page_bits) < (1 << bt->page_bits) )
return bt->err = BTERR_wrt, 0;
}
#else
// bring end of file area into page-cache
if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
return 0;
memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, 1 << bt->page_bits);
if ( (new_page & bt->hashmask) == 0 )
memset(pmap+(bt->hashmask << bt->page_bits), 0, 1 << bt->page_bits);
#endif
// unlock page zero
if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
return 0;
return new_page;
}
// split the root and raise the level of the btree
BTERR bt_splitroot(BtDb *bt, unsigned char *newkey, unsigned char *oldkey, uid page_no2)
{
BtPage root = bt->page;
uid new_page;
uint off;
bt_putid(root->right, page_no2);
if( !(new_page = bt_newpage(bt, root)) )
return bt->err;
// preserve the page info and set rest to zero
memset(root+1, 0, (1 << bt->page_bits) - sizeof(*root));
// insert first key on newroot page
off = (1 << bt->page_bits) - *newkey - 1;
bt_putid(root->keys[1].id, new_page);
root->keys[1].off = off ;
((unsigned char *)root)[off] = *newkey;
memcpy ((unsigned char *)root + off +1, newkey+1, *newkey);
// insert second key on newroot page
off -= *oldkey + 1;
bt_putid(root->keys[2].id, page_no2);
root->keys[2].off = off;
((unsigned char *)root)[off] = *oldkey;
memcpy ((unsigned char *)root + off +1, oldkey+1, *oldkey);
root->keys[0].off = off; // reset lowest used offset
root->cnt = 2;
root->lvl++;
bt_putid(root->right, 0);
// update and release root (bt->page)
if( bt_update(bt, root, bt->page_no) )
return bt->err;
return bt_unlockpage(bt, bt->page_no, BtLockWrite);
}
// repair/split full block
BTERR bt_splitpage (BtDb *bt, uint len)
{
unsigned char oldkey[256], lowerkey[256];
uint cnt = 0, idx = 0, max, off;
uid page_no = bt->page_no;
BtPage page = bt->page;
uint lvl = page->lvl;
uid new_page;
uint tod = 0;
BtKey key;
if ( lvl == 0 )
{
bt_cleanpage(bt);
if( page->keys[0].off >= page->cnt * sizeof(BtSlot) + sizeof(*page) + len + 1)
{
if ( bt_update(bt, bt->page, bt->page_no) )
return bt->err;
return bt_unlockpage(bt, page_no, BtLockWrite);
}
}
// split higher half of keys to bt->frame
tod = (uint)time(NULL);
memset (bt->frame, 0, (1 << bt->page_bits));
off = (1 << bt->page_bits);
max = (int)page->cnt;
cnt = max / 2;
idx = 0;
while( cnt++ < max ) {
key = (BtKey)((char*)page + page->keys[cnt].off);
off -= key->len + 1;
memcpy ((unsigned char *)bt->frame + off, key, key->len + 1);
memcpy(bt->frame->keys[++idx].id, page->keys[cnt].id, sizeof(page->keys->id));
bt->frame->keys[idx].tod = page->keys[cnt].tod;
bt->frame->keys[idx].off = off;
}
// remember existing page key for new page
memcpy (oldkey, key, key->len + 1);
bt->frame->keys[0].off = off;
bt->frame->cnt = idx;
bt->frame->lvl = lvl;
if( page_no > ROOT_page )
memcpy(bt->frame->right, page->right, sizeof(page->right));
// get new free page
if( !(new_page = bt_newpage(bt, bt->frame)) )
return bt->err;
// update lower keys to continue in old page
memcpy (bt->frame, page, (1 << bt->page_bits));
memset (page+1, 0, (1 << bt->page_bits) - sizeof(*page));
off = (1 << bt->page_bits);
cnt = 0;
idx = 0;
// assemble page of smaller keys
while( cnt++ < max / 2 ) {
key = (BtKey)((char*)bt->frame + bt->frame->keys[cnt].off);
off -= key->len + 1;
memcpy ((unsigned char *)page + off, key, key->len + 1);
memcpy(page->keys[++idx].id, bt->frame->keys[cnt].id, sizeof(bt->frame->keys->id));
page->keys[idx].tod = bt->frame->keys[cnt].tod;
page->keys[idx].off = off;
}
bt_putid(page->keys[0].id, page_no);
bt_putid(page->right, new_page);
page->keys[0].off = off;
page->cnt = idx;
key = (BtKey)((char*)page + off);
memcpy(lowerkey+1, key->key, key->len);
*lowerkey = key->len;
// split the root page, if current page is the root page
if( page_no == ROOT_page )
return bt_splitroot (bt, lowerkey, oldkey, new_page);
if( bt_update(bt, bt->page, bt->page_no) )
return bt->err;
// insert key for reformulated lower block
if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) )
return bt->err;
// release wr lock on lower page
if( bt_unlockpage (bt, page_no, BtLockWrite) )
return bt->err;
// fix key for newly allocated higher block page
return bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod);
}
// insert new key into the btree at requested level
// level zero pages are leaf pages
BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
{
uint off, slot, idx;
BtPage page;
BtKey nxt;
while( 1 ) {
if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
nxt = keyptr(slot);
else
{
if ( !bt->err )
bt->err = BTERR_ovflw;
return bt->err;
}
// if key already exists, update id
page = bt->page;
if( !keycmp (nxt, key, len) ) {
page->keys[slot].dead = 0;
bt_putid(page->keys[slot].id, id);
page->keys[slot].tod = tod;
if ( bt_update(bt, bt->page, bt->page_no) )
return bt->err;
return bt_unlockpage(bt, bt->page_no, BtLockWrite);
}
// check if page has enough space
if( page->keys[0].off >= page->cnt * sizeof(BtSlot) + sizeof(*page) + len + 1)
break;
if( bt_splitpage (bt, len) )
return bt->err;
}
// calculate next available slot and copy key into page
off = page->keys[0].off - len - 1;
page->keys[0].off = off; // reset lowest used offset
((unsigned char *)page)[off] = len;
memcpy ((unsigned char *)page + off +1, key, len );
// now insert key into array before slot
for( idx = (int)++page->cnt; idx > slot; idx-- )
page->keys[idx] = page->keys[idx - 1];
page->keys[slot].off = off;
page->keys[slot].dead = 0;
bt_putid(page->keys[slot].id, id);
page->keys[slot].tod = tod;
if ( bt_update(bt, bt->page, bt->page_no) )
return bt->err;
return bt_unlockpage(bt, bt->page_no, BtLockWrite);
}
// cache page of keys into cursor and return starting slot for given key
uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
{
uint slot;
// cache page for retrieval
if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
memcpy (bt->cursor, bt->page, (1 << bt->page_bits));
if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
return 0;
return slot;
}
// return next slot for cursor page
// or slide cursor right into next page
uint bt_nextkey (BtDb *bt, uint slot)
{
off64_t right;
do {
right = bt_getid(bt->cursor->right);
while( slot++ < bt->cursor->cnt )
if( bt->cursor->keys[slot].dead )
continue;
else if( right || (slot < bt->cursor->cnt))
return slot;
else
break;
if( !right )
break;
if( bt_lockpage(bt, right,BtLockRead) )
return 0;
if( bt_mappage (bt, &bt->page, right) )
break;
memcpy (bt->cursor, bt->page, (1 << bt->page_bits));
if ( bt_unlockpage(bt, right, BtLockRead) )
return 0;
slot = 0;
} while( 1 );
return 0;
}
BtKey bt_key(BtDb *bt, uint slot)
{
return (BtKey)((unsigned char*)bt->cursor + bt->cursor->keys[slot].off);
}
uid bt_uid(BtDb *bt, uint slot)
{
return bt_getid(bt->cursor->keys[slot].id);
}
uint bt_tod(BtDb *bt, uint slot)
{
return bt->cursor->keys[slot].tod;
}
#ifdef STANDALONE
// standalone program to index file of keys
// then list them onto std-out
int main (int argc, char **argv)
{
uint slot, line = 0, off = 0;
unsigned char key[256];
int dead, ch, cnt = 0;
uint done, start;
int bits = 12;
uint scan = 0;
uint len = 0;
BtKey nxt;
BtDb *bt;
FILE *in;
if( argc < 3 ) {
printf ("Usage: %s index_file src_file read/write/scan [page_bits line_off]", argv[0]);
exit(0);
}
start = (uint)time(NULL);
if( argc > 4 )
bits = atoi(argv[4]);
if( argc > 5 )
off = atoi(argv[5]);
// use memory mapping in 64K chunks
bt = bt_open ((argv[1]), BT_rw, bits, 65536);
switch(argv[3][0]| 0x20)
{
case 'w':
printf("started indexing for %s at: %ld\n", argv[2], start);
if( argc > 2 && (in = fopen (argv[2], "rb")) )
while( ch = getc(in), ch != EOF )
if( ch == '\n' )
{
if( off )
sprintf((char *)key+len, "%.8d", line + off), len += 8;
if( bt_insertkey (bt, key, len, 0, ++line, start) )
printf("Error %d Line: %d\n", bt->err, line), exit(0);
len = 0;
}
else if( len < 245 )
key[len++] = ch;
printf("finished adding keys, %d \n", line);
break;
case 'd':
printf("started deleting keys for %s at: %ld\n", argv[2], start);
if( argc > 2 && (in = fopen (argv[2], "rb")) )
while( ch = getc(in), ch != EOF )
if( ch == '\n' )
{
if( off )
sprintf((char *)key+len, "%.8d", line + off), len += 8;
line++;
if( bt_deletekey (bt, key, len) )
printf("Error %d Line: %d\n", bt->err, line), exit(0);
len = 0;
}
else if( len < 245 )
key[len++] = ch;
printf("finished deleting keys, %d \n", line);
break;
case 's':
scan++;
break;
}
dead = cnt = len = 0;
printf("started reading, %d \n", line);
if( slot = bt_startkey (bt, key, len) )
slot--;
else
printf("Error %d in StartKey. Line: %d\n", bt->err, line), exit(0);
while( slot = bt_nextkey (bt, slot) )
if( cnt++, scan ) {
nxt = bt_key(bt, slot);
fwrite (nxt->key, nxt->len, 1, stdout);
fputc ('\n', stdout);
}
done = (uint)time(NULL);
printf("Done at : %ld \n Time to complete: %ld seconds\n", done, done - start);
printf(" Total keys read %d \n", cnt);
}
#endif //STANDALONE