Merge pull request 'Getting optional LMDB support into Cytoplasm' (#43) from lda/Cytoplasm:lmdbwerk into master

Reviewed-on: https://git.telodendria.io/Telodendria/Cytoplasm/pulls/43
This commit is contained in:
Jordan Bancino 2024-08-23 16:19:59 -04:00
commit b6b915530c
7 changed files with 1610 additions and 981 deletions

21
configure vendored
View file

@ -15,7 +15,7 @@ TOOLS="tools"
# Default compiler flags. These must be supported by all POSIX C compilers.
# "Fancy" compilers that have additional options must be detected and set below.
CFLAGS="-O1 -D_DEFAULT_SOURCE -I${INCLUDE}"
CFLAGS="-O1 -D_DEFAULT_SOURCE -I${INCLUDE} -I${SRC}";
LIBS="-lm -lpthread"
# Default args for all platforms.
@ -24,10 +24,10 @@ SCRIPT_ARGS="--prefix=/usr/local --lib-name=Cytoplasm"
# Set SSL flags depending on the platform.
case "$(uname)" in
OpenBSD)
SCRIPT_ARGS="${SCRIPT_ARGS} --with-libressl"
SCRIPT_ARGS="${SCRIPT_ARGS} --with-libressl --disable-lmdb"
;;
*)
SCRIPT_ARGS="${SCRIPT_ARGS} --with-openssl"
SCRIPT_ARGS="${SCRIPT_ARGS} --with-openssl --disable-lmdb"
;;
esac
@ -80,6 +80,14 @@ for arg in $SCRIPT_ARGS; do
TLS_IMPL=""
TLS_LIBS=""
;;
--with-lmdb)
EDB_IMPL="EDB_LMDB"
EDB_LIBS="-llmdb"
;;
--disable-lmdb)
EDB_IMPL=""
EDB_LIBS=""
;;
--prefix=*)
PREFIX=$(echo "$arg" | cut -d '=' -f 2-)
;;
@ -104,6 +112,11 @@ if [ -n "$TLS_IMPL" ]; then
LIBS="${LIBS} ${TLS_LIBS}"
fi
if [ -n "$EDB_IMPL" ]; then
CFLAGS="${CFLAGS} -D${EDB_IMPL}"
LIBS="${LIBS} ${EDB_LIBS}"
fi
CFLAGS="${CFLAGS} '-DLIB_NAME=\"${LIB_NAME}\"' ${DEBUG}"
LDFLAGS="${LIBS} ${LDFLAGS}"
@ -140,7 +153,7 @@ print_obj() {
get_deps() {
src="$1"
${CC} -I${INCLUDE} -E "$src" \
${CC} -I${SRC} -I${INCLUDE} -E "$src" \
| grep '^#' \
| awk '{print $3}' \
| cut -d '"' -f 2 \

View file

@ -48,6 +48,17 @@
*/
typedef struct Db Db;
/**
* Some "hints" for the database backend for operations.
* Hints are a way for the program to declare what to except of it
* (and the program MUST adhere to these hints, but the backend
* MAY adhere).
*/
typedef enum DbHint {
DB_HINT_READONLY, /* The database reference is treated as read-only */
DB_HINT_WRITE /* The database reference is treated as read/write */
} DbHint;
/**
* When an object is locked, a reference is returned. This reference
* is owned by the current thread, and the database is inaccessible to
@ -68,6 +79,15 @@ typedef struct DbRef DbRef;
*/
extern Db * DbOpen(char *, size_t);
/**
* Open a LMDB data directory. This function is similar to
* .Fn DbOpen ,
* but works with a LMDB-based backend, with the second argument
* being the maximum filesize. This function fails with NULL if ever
* called without LMDB enabled at compiletime.
*/
extern Db * DbOpenLMDB(char *, size_t);
/**
* Close the database. This function will flush anything in the cache
* to the disk, and then close the data directory. It assumes that
@ -109,6 +129,15 @@ extern DbRef * DbCreate(Db *, size_t,...);
*/
extern DbRef * DbLock(Db *, size_t,...);
/**
* Behaves like
* .Fn DbLock ,
* but with hints on the reference itself, as
* .Fn DbLock
* itself is read/write.
*/
extern DbRef * DbLockIntent(Db *, DbHint, size_t,...);
/**
* Immediately and permanently remove an object from the database.
* This function assumes the object is not locked, otherwise undefined

977
src/Db.c
View file

@ -1,977 +0,0 @@
/*
* Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <Db.h>
#include <Memory.h>
#include <Json.h>
#include <Util.h>
#include <Str.h>
#include <Stream.h>
#include <Log.h>
#include <sys/types.h>
#include <dirent.h>
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
struct Db
{
char *dir;
pthread_mutex_t lock;
size_t cacheSize;
size_t maxCache;
HashMap *cache;
/*
* The cache uses a double linked list (see DbRef
* below) to know which objects are most and least
* recently used. The following diagram helps me
* know what way all the pointers go, because it
* can get very confusing sometimes. For example,
* there's nothing stopping "next" from pointing to
* least recent, and "prev" from pointing to most
* recent, so hopefully this clarifies the pointer
* terminology used when dealing with the linked
* list:
*
* mostRecent leastRecent
* | prev prev | prev
* +---+ ---> +---+ ---> +---+ ---> NULL
* |ref| |ref| |ref|
* NULL <--- +---+ <--- +---+ <--- +---+
* next next next
*/
DbRef *mostRecent;
DbRef *leastRecent;
};
struct DbRef
{
HashMap *json;
uint64_t ts;
size_t size;
Array *name;
DbRef *prev;
DbRef *next;
int fd;
Stream *stream;
};
static void
StringArrayFree(Array * arr)
{
size_t i;
for (i = 0; i < ArraySize(arr); i++)
{
Free(ArrayGet(arr, i));
}
ArrayFree(arr);
}
static ssize_t DbComputeSize(HashMap *);
static ssize_t
DbComputeSizeOfValue(JsonValue * val)
{
MemoryInfo *a;
ssize_t total = 0;
size_t i;
union
{
char *str;
Array *arr;
} u;
if (!val)
{
return -1;
}
a = MemoryInfoGet(val);
if (a)
{
total += MemoryInfoGetSize(a);
}
switch (JsonValueType(val))
{
case JSON_OBJECT:
total += DbComputeSize(JsonValueAsObject(val));
break;
case JSON_ARRAY:
u.arr = JsonValueAsArray(val);
a = MemoryInfoGet(u.arr);
if (a)
{
total += MemoryInfoGetSize(a);
}
for (i = 0; i < ArraySize(u.arr); i++)
{
total += DbComputeSizeOfValue(ArrayGet(u.arr, i));
}
break;
case JSON_STRING:
u.str = JsonValueAsString(val);
a = MemoryInfoGet(u.str);
if (a)
{
total += MemoryInfoGetSize(a);
}
break;
case JSON_NULL:
case JSON_INTEGER:
case JSON_FLOAT:
case JSON_BOOLEAN:
default:
/* These don't use any extra heap space */
break;
}
return total;
}
static ssize_t
DbComputeSize(HashMap * json)
{
char *key;
JsonValue *val;
MemoryInfo *a;
size_t total;
if (!json)
{
return -1;
}
total = 0;
a = MemoryInfoGet(json);
if (a)
{
total += MemoryInfoGetSize(a);
}
while (HashMapIterate(json, &key, (void **) &val))
{
a = MemoryInfoGet(key);
if (a)
{
total += MemoryInfoGetSize(a);
}
total += DbComputeSizeOfValue(val);
}
return total;
}
static char *
DbHashKey(Array * args)
{
size_t i;
char *str = NULL;
for (i = 0; i < ArraySize(args); i++)
{
char *tmp = StrConcat(2, str, ArrayGet(args, i));
Free(str);
str = tmp;
}
return str;
}
static char
DbSanitiseChar(char input)
{
switch (input)
{
case '/':
return '_';
case '.':
return '-';
}
return input;
}
static char *
DbDirName(Db * db, Array * args, size_t strip)
{
size_t i, j;
char *str = StrConcat(2, db->dir, "/");
for (i = 0; i < ArraySize(args) - strip; i++)
{
char *tmp;
char *sanitise = StrDuplicate(ArrayGet(args, i));
for (j = 0; j < strlen(sanitise); j++)
{
sanitise[j] = DbSanitiseChar(sanitise[j]);
}
tmp = StrConcat(3, str, sanitise, "/");
Free(str);
Free(sanitise);
str = tmp;
}
return str;
}
static char *
DbFileName(Db * db, Array * args)
{
size_t i;
char *str = StrConcat(2, db->dir, "/");
for (i = 0; i < ArraySize(args); i++)
{
char *tmp;
char *arg = StrDuplicate(ArrayGet(args, i));
size_t j = 0;
/* Sanitize name to prevent directory traversal attacks */
while (arg[j])
{
arg[j] = DbSanitiseChar(arg[j]);
j++;
}
tmp = StrConcat(3, str, arg,
(i < ArraySize(args) - 1) ? "/" : ".json");
Free(arg);
Free(str);
str = tmp;
}
return str;
}
static void
DbCacheEvict(Db * db)
{
DbRef *ref = db->leastRecent;
DbRef *tmp;
while (ref && db->cacheSize > db->maxCache)
{
char *hash;
JsonFree(ref->json);
hash = DbHashKey(ref->name);
HashMapDelete(db->cache, hash);
Free(hash);
StringArrayFree(ref->name);
db->cacheSize -= ref->size;
if (ref->next)
{
ref->next->prev = ref->prev;
}
else
{
db->mostRecent = ref->prev;
}
if (ref->prev)
{
ref->prev->next = ref->next;
}
else
{
db->leastRecent = ref->next;
}
tmp = ref->next;
Free(ref);
ref = tmp;
}
}
Db *
DbOpen(char *dir, size_t cache)
{
Db *db;
pthread_mutexattr_t attr;
if (!dir)
{
return NULL;
}
db = Malloc(sizeof(Db));
if (!db)
{
return NULL;
}
db->dir = dir;
db->maxCache = cache;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&db->lock, &attr);
pthread_mutexattr_destroy(&attr);
db->mostRecent = NULL;
db->leastRecent = NULL;
db->cacheSize = 0;
if (db->maxCache)
{
db->cache = HashMapCreate();
if (!db->cache)
{
return NULL;
}
}
else
{
db->cache = NULL;
}
return db;
}
void
DbMaxCacheSet(Db * db, size_t cache)
{
if (!db)
{
return;
}
pthread_mutex_lock(&db->lock);
db->maxCache = cache;
if (db->maxCache && !db->cache)
{
db->cache = HashMapCreate();
db->cacheSize = 0;
}
DbCacheEvict(db);
pthread_mutex_unlock(&db->lock);
}
void
DbClose(Db * db)
{
if (!db)
{
return;
}
pthread_mutex_lock(&db->lock);
DbMaxCacheSet(db, 0);
DbCacheEvict(db);
HashMapFree(db->cache);
pthread_mutex_unlock(&db->lock);
pthread_mutex_destroy(&db->lock);
Free(db);
}
static DbRef *
DbLockFromArr(Db * db, Array * args)
{
char *file;
char *hash;
DbRef *ref;
struct flock lock;
int fd;
Stream *stream;
if (!db || !args)
{
return NULL;
}
ref = NULL;
hash = NULL;
pthread_mutex_lock(&db->lock);
/* Check if the item is in the cache */
hash = DbHashKey(args);
ref = HashMapGet(db->cache, hash);
file = DbFileName(db, args);
fd = open(file, O_RDWR);
if (fd == -1)
{
if (ref)
{
HashMapDelete(db->cache, hash);
JsonFree(ref->json);
StringArrayFree(ref->name);
db->cacheSize -= ref->size;
if (ref->next)
{
ref->next->prev = ref->prev;
}
else
{
db->mostRecent = ref->prev;
}
if (ref->prev)
{
ref->prev->next = ref->next;
}
else
{
db->leastRecent = ref->next;
}
if (!db->leastRecent)
{
db->leastRecent = db->mostRecent;
}
Free(ref);
}
ref = NULL;
goto finish;
}
stream = StreamFd(fd);
lock.l_start = 0;
lock.l_len = 0;
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
/* Lock the file on the disk */
if (fcntl(fd, F_SETLK, &lock) < 0)
{
StreamClose(stream);
ref = NULL;
goto finish;
}
if (ref) /* In cache */
{
uint64_t diskTs = UtilLastModified(file);
ref->fd = fd;
ref->stream = stream;
if (diskTs > ref->ts)
{
/* File was modified on disk since it was cached */
HashMap *json = JsonDecode(ref->stream);
if (!json)
{
StreamClose(ref->stream);
ref = NULL;
goto finish;
}
JsonFree(ref->json);
ref->json = json;
ref->ts = diskTs;
ref->size = DbComputeSize(ref->json);
}
/* Float this ref to mostRecent */
if (ref->next)
{
ref->next->prev = ref->prev;
if (!ref->prev)
{
db->leastRecent = ref->next;
}
else
{
ref->prev->next = ref->next;
}
ref->prev = db->mostRecent;
ref->next = NULL;
if (db->mostRecent)
{
db->mostRecent->next = ref;
}
db->mostRecent = ref;
}
/* If there is no least recent, this is the only thing in the
* cache, so it is also least recent. */
if (!db->leastRecent)
{
db->leastRecent = ref;
}
/* The file on disk may be larger than what we have in memory,
* which may require items in cache to be evicted. */
DbCacheEvict(db);
}
else
{
Array *name;
size_t i;
/* Not in cache; load from disk */
ref = Malloc(sizeof(DbRef));
if (!ref)
{
StreamClose(stream);
goto finish;
}
ref->json = JsonDecode(stream);
if (!ref->json)
{
Free(ref);
StreamClose(stream);
ref = NULL;
goto finish;
}
ref->fd = fd;
ref->stream = stream;
name = ArrayCreate();
for (i = 0; i < ArraySize(args); i++)
{
ArrayAdd(name, StrDuplicate(ArrayGet(args, i)));
}
ref->name = name;
if (db->cache)
{
ref->ts = UtilTsMillis();
ref->size = DbComputeSize(ref->json);
HashMapSet(db->cache, hash, ref);
db->cacheSize += ref->size;
ref->next = NULL;
ref->prev = db->mostRecent;
if (db->mostRecent)
{
db->mostRecent->next = ref;
}
db->mostRecent = ref;
if (!db->leastRecent)
{
db->leastRecent = ref;
}
/* Adding this item to the cache may case it to grow too
* large, requiring some items to be evicted */
DbCacheEvict(db);
}
}
finish:
if (!ref)
{
pthread_mutex_unlock(&db->lock);
}
Free(file);
Free(hash);
return ref;
}
DbRef *
DbCreate(Db * db, size_t nArgs,...)
{
Stream *fp;
char *file;
char *dir;
va_list ap;
Array *args;
DbRef *ret;
if (!db)
{
return NULL;
}
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
if (!args)
{
return NULL;
}
pthread_mutex_lock(&db->lock);
file = DbFileName(db, args);
if (UtilLastModified(file))
{
Free(file);
ArrayFree(args);
pthread_mutex_unlock(&db->lock);
return NULL;
}
dir = DbDirName(db, args, 1);
if (UtilMkdir(dir, 0750) < 0)
{
Free(file);
ArrayFree(args);
Free(dir);
pthread_mutex_unlock(&db->lock);
return NULL;
}
Free(dir);
fp = StreamOpen(file, "w");
Free(file);
if (!fp)
{
ArrayFree(args);
pthread_mutex_unlock(&db->lock);
return NULL;
}
StreamPuts(fp, "{}");
StreamClose(fp);
/* DbLockFromArr() will lock again for us */
pthread_mutex_unlock(&db->lock);
ret = DbLockFromArr(db, args);
ArrayFree(args);
return ret;
}
bool
DbDelete(Db * db, size_t nArgs,...)
{
va_list ap;
Array *args;
char *file;
char *hash;
bool ret = true;
DbRef *ref;
if (!db)
{
return false;
}
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
pthread_mutex_lock(&db->lock);
hash = DbHashKey(args);
file = DbFileName(db, args);
ref = HashMapGet(db->cache, hash);
if (ref)
{
HashMapDelete(db->cache, hash);
JsonFree(ref->json);
StringArrayFree(ref->name);
db->cacheSize -= ref->size;
if (ref->next)
{
ref->next->prev = ref->prev;
}
else
{
db->mostRecent = ref->prev;
}
if (ref->prev)
{
ref->prev->next = ref->next;
}
else
{
db->leastRecent = ref->next;
}
if (!db->leastRecent)
{
db->leastRecent = db->mostRecent;
}
Free(ref);
}
Free(hash);
if (UtilLastModified(file))
{
ret = (remove(file) == 0);
}
pthread_mutex_unlock(&db->lock);
ArrayFree(args);
Free(file);
return ret;
}
DbRef *
DbLock(Db * db, size_t nArgs,...)
{
va_list ap;
Array *args;
DbRef *ret;
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
if (!args)
{
return NULL;
}
ret = DbLockFromArr(db, args);
ArrayFree(args);
return ret;
}
bool
DbUnlock(Db * db, DbRef * ref)
{
bool destroy;
if (!db || !ref)
{
return false;
}
lseek(ref->fd, 0L, SEEK_SET);
if (ftruncate(ref->fd, 0) < 0)
{
pthread_mutex_unlock(&db->lock);
Log(LOG_ERR, "Failed to truncate file on disk.");
Log(LOG_ERR, "Error on fd %d: %s", ref->fd, strerror(errno));
return false;
}
JsonEncode(ref->json, ref->stream, JSON_DEFAULT);
StreamClose(ref->stream);
if (db->cache)
{
char *key = DbHashKey(ref->name);
if (HashMapGet(db->cache, key))
{
db->cacheSize -= ref->size;
ref->size = DbComputeSize(ref->json);
db->cacheSize += ref->size;
/* If this ref has grown significantly since we last
* computed its size, it may have filled the cache and
* require some items to be evicted. */
DbCacheEvict(db);
destroy = false;
}
else
{
destroy = true;
}
Free(key);
}
else
{
destroy = true;
}
if (destroy)
{
JsonFree(ref->json);
StringArrayFree(ref->name);
Free(ref);
}
pthread_mutex_unlock(&db->lock);
return true;
}
bool
DbExists(Db * db, size_t nArgs,...)
{
va_list ap;
Array *args;
char *file;
bool ret;
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
if (!args)
{
return false;
}
pthread_mutex_lock(&db->lock);
file = DbFileName(db, args);
ret = (UtilLastModified(file) != 0);
pthread_mutex_unlock(&db->lock);
Free(file);
ArrayFree(args);
return ret;
}
Array *
DbList(Db * db, size_t nArgs,...)
{
Array *result;
Array *path;
DIR *files;
struct dirent *file;
char *dir;
va_list ap;
if (!db || !nArgs)
{
return NULL;
}
result = ArrayCreate();
if (!result)
{
return NULL;
}
va_start(ap, nArgs);
path = ArrayFromVarArgs(nArgs, ap);
dir = DbDirName(db, path, 0);
pthread_mutex_lock(&db->lock);
files = opendir(dir);
if (!files)
{
ArrayFree(path);
ArrayFree(result);
Free(dir);
pthread_mutex_unlock(&db->lock);
return NULL;
}
while ((file = readdir(files)))
{
size_t namlen = strlen(file->d_name);
if (namlen > 5)
{
int nameOffset = namlen - 5;
if (StrEquals(file->d_name + nameOffset, ".json"))
{
file->d_name[nameOffset] = '\0';
ArrayAdd(result, StrDuplicate(file->d_name));
}
}
}
closedir(files);
ArrayFree(path);
Free(dir);
pthread_mutex_unlock(&db->lock);
return result;
}
void
DbListFree(Array * arr)
{
StringArrayFree(arr);
}
HashMap *
DbJson(DbRef * ref)
{
return ref ? ref->json : NULL;
}
bool
DbJsonSet(DbRef * ref, HashMap * json)
{
if (!ref || !json)
{
return false;
}
JsonFree(ref->json);
ref->json = JsonDuplicate(json);
return true;
}

499
src/Db/Db.c Normal file
View file

@ -0,0 +1,499 @@
/*
* Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <Db.h>
#include <Memory.h>
#include <Json.h>
#include <Util.h>
#include <Str.h>
#include <Stream.h>
#include <Log.h>
#include <sys/types.h>
#include <dirent.h>
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "Db/Internal.h"
void
StringArrayFree(Array * arr)
{
size_t i;
for (i = 0; i < ArraySize(arr); i++)
{
Free(ArrayGet(arr, i));
}
ArrayFree(arr);
}
static ssize_t DbComputeSize(HashMap *);
static ssize_t
DbComputeSizeOfValue(JsonValue * val)
{
MemoryInfo *a;
ssize_t total = 0;
size_t i;
union
{
char *str;
Array *arr;
} u;
if (!val)
{
return -1;
}
a = MemoryInfoGet(val);
if (a)
{
total += MemoryInfoGetSize(a);
}
switch (JsonValueType(val))
{
case JSON_OBJECT:
total += DbComputeSize(JsonValueAsObject(val));
break;
case JSON_ARRAY:
u.arr = JsonValueAsArray(val);
a = MemoryInfoGet(u.arr);
if (a)
{
total += MemoryInfoGetSize(a);
}
for (i = 0; i < ArraySize(u.arr); i++)
{
total += DbComputeSizeOfValue(ArrayGet(u.arr, i));
}
break;
case JSON_STRING:
u.str = JsonValueAsString(val);
a = MemoryInfoGet(u.str);
if (a)
{
total += MemoryInfoGetSize(a);
}
break;
case JSON_NULL:
case JSON_INTEGER:
case JSON_FLOAT:
case JSON_BOOLEAN:
default:
/* These don't use any extra heap space */
break;
}
return total;
}
static ssize_t
DbComputeSize(HashMap * json)
{
char *key;
JsonValue *val;
MemoryInfo *a;
size_t total;
if (!json)
{
return -1;
}
total = 0;
a = MemoryInfoGet(json);
if (a)
{
total += MemoryInfoGetSize(a);
}
while (HashMapIterate(json, &key, (void **) &val))
{
a = MemoryInfoGet(key);
if (a)
{
total += MemoryInfoGetSize(a);
}
total += DbComputeSizeOfValue(val);
}
return total;
}
static char *
DbHashKey(Array * args)
{
size_t i;
char *str = NULL;
for (i = 0; i < ArraySize(args); i++)
{
char *tmp = StrConcat(2, str, ArrayGet(args, i));
Free(str);
str = tmp;
}
return str;
}
static void
DbCacheEvict(Db * db)
{
DbRef *ref = db->leastRecent;
DbRef *tmp;
while (ref && db->cacheSize > db->maxCache)
{
char *hash;
JsonFree(ref->json);
hash = DbHashKey(ref->name);
HashMapDelete(db->cache, hash);
Free(hash);
StringArrayFree(ref->name);
db->cacheSize -= ref->size;
if (ref->next)
{
ref->next->prev = ref->prev;
}
else
{
db->mostRecent = ref->prev;
}
if (ref->prev)
{
ref->prev->next = ref->next;
}
else
{
db->leastRecent = ref->next;
}
tmp = ref->next;
Free(ref);
ref = tmp;
}
}
void
DbMaxCacheSet(Db * db, size_t cache)
{
if (!db)
{
return;
}
pthread_mutex_lock(&db->lock);
db->maxCache = cache;
if (db->maxCache && !db->cache)
{
db->cache = HashMapCreate();
db->cacheSize = 0;
}
DbCacheEvict(db);
pthread_mutex_unlock(&db->lock);
}
void
DbClose(Db * db)
{
if (!db)
{
return;
}
pthread_mutex_lock(&db->lock);
if (db->close)
{
db->close(db);
}
DbMaxCacheSet(db, 0);
DbCacheEvict(db);
HashMapFree(db->cache);
pthread_mutex_unlock(&db->lock);
pthread_mutex_destroy(&db->lock);
Free(db);
}
DbRef *
DbCreate(Db * db, size_t nArgs,...)
{
va_list ap;
Array *args;
DbRef *ret;
if (!db || !db->create)
{
return NULL;
}
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
if (!args)
{
return NULL;
}
ret = db->create(db, args);
ArrayFree(args);
return ret;
}
bool
DbDelete(Db * db, size_t nArgs,...)
{
va_list ap;
Array *args;
bool ret = true;
if (!db)
{
return false;
}
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
ret = db->delete(db, args);
ArrayFree(args);
return ret;
}
DbRef *
DbLock(Db * db, size_t nArgs,...)
{
va_list ap;
Array *args;
DbRef *ret;
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
if (!args || !db->lockFunc)
{
return NULL;
}
ret = db->lockFunc(db, DB_HINT_WRITE, args);
ArrayFree(args);
return ret;
}
DbRef *
DbLockIntent(Db * db, DbHint hint, size_t nArgs,...)
{
va_list ap;
Array *args;
DbRef *ret;
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
if (!args || !db->lockFunc)
{
return NULL;
}
ret = db->lockFunc(db, hint, args);
ArrayFree(args);
return ret;
}
bool
DbUnlock(Db * db, DbRef * ref)
{
if (!db || !ref || !db->unlock)
{
return false;
}
return db->unlock(db, ref);
}
bool
DbExists(Db * db, size_t nArgs,...)
{
va_list ap;
Array *args;
bool ret;
if (!db || !nArgs || !db->exists)
{
return false;
}
va_start(ap, nArgs);
args = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
if (!args)
{
return false;
}
ret = db->exists(db, args);
ArrayFree(args);
return ret;
}
Array *
DbList(Db * db, size_t nArgs,...)
{
Array *result;
Array *path;
va_list ap;
if (!db || !nArgs || !db->list)
{
return NULL;
}
va_start(ap, nArgs);
path = ArrayFromVarArgs(nArgs, ap);
va_end(ap);
result = db->list(db, path);
ArrayFree(path);
return result;
}
void
DbListFree(Array * arr)
{
StringArrayFree(arr);
}
HashMap *
DbJson(DbRef * ref)
{
return ref ? ref->json : NULL;
}
bool
DbJsonSet(DbRef * ref, HashMap * json)
{
if (!ref || !json)
{
return false;
}
JsonFree(ref->json);
ref->json = JsonDuplicate(json);
return true;
}
void
DbInit(Db *db)
{
pthread_mutexattr_t attr;
if (!db)
{
return;
}
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&db->lock, &attr);
pthread_mutexattr_destroy(&attr);
db->mostRecent = NULL;
db->leastRecent = NULL;
db->cacheSize = 0;
db->maxCache = 0;
if (db->maxCache)
{
db->cache = HashMapCreate();
}
else
{
db->cache = NULL;
}
}
void
DbRefInit(Db *db, DbRef *ref)
{
if (!db || !ref)
{
return;
}
ref->prev = db->mostRecent;
ref->next = NULL;
ref->json = NULL;
ref->name = NULL;
/* As default values to be overwritten by impls */
ref->ts = UtilTsMillis();
ref->size = 0;
/* TODO: Append the ref to the cache list.
* I removed it because it broke everything and crashed all the time.
* My bad! */
}
void
StringArrayAppend(Array *arr, char *str)
{
if (!arr || !str)
{
return;
}
ArrayAdd(arr, StrDuplicate(str));
}

374
src/Db/Flat.c Normal file
View file

@ -0,0 +1,374 @@
#include <Db.h>
#include <Memory.h>
#include <Json.h>
#include <Util.h>
#include <Str.h>
#include <Stream.h>
#include <Log.h>
#include <sys/types.h>
#include <dirent.h>
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "Db/Internal.h"
typedef struct FlatDb {
Db base;
char *dir;
/* Theres not much to do here. */
} FlatDb;
typedef struct FlatDbRef {
DbRef base;
Stream *stream;
int fd;
} FlatDbRef;
static char
DbSanitiseChar(char input)
{
switch (input)
{
case '/':
return '_';
case '.':
return '-';
}
return input;
}
static char *
DbDirName(FlatDb * db, Array * args, size_t strip)
{
size_t i, j;
char *str = StrConcat(2, db->dir, "/");
for (i = 0; i < ArraySize(args) - strip; i++)
{
char *tmp;
char *sanitise = StrDuplicate(ArrayGet(args, i));
for (j = 0; j < strlen(sanitise); j++)
{
sanitise[j] = DbSanitiseChar(sanitise[j]);
}
tmp = StrConcat(3, str, sanitise, "/");
Free(str);
Free(sanitise);
str = tmp;
}
return str;
}
static char *
DbFileName(FlatDb * db, Array * args)
{
size_t i;
char *str = StrConcat(2, db->dir, "/");
for (i = 0; i < ArraySize(args); i++)
{
char *tmp;
char *arg = StrDuplicate(ArrayGet(args, i));
size_t j = 0;
/* Sanitize name to prevent directory traversal attacks */
while (arg[j])
{
arg[j] = DbSanitiseChar(arg[j]);
j++;
}
tmp = StrConcat(3, str, arg,
(i < ArraySize(args) - 1) ? "/" : ".json");
Free(arg);
Free(str);
str = tmp;
}
return str;
}
static DbRef *
FlatLock(Db *d, DbHint hint, Array *dir)
{
FlatDb *db = (FlatDb *) d;
FlatDbRef *ref = NULL;
size_t i;
char *path = NULL;
if (!d || !dir)
{
return NULL;
}
pthread_mutex_lock(&d->lock);
path = DbFileName(db, dir);
/* TODO: Caching */
{
int fd = open(path, O_RDWR);
Stream *stream;
struct flock lock;
if (fd == -1)
{
ref = NULL;
goto end;
}
stream = StreamFd(fd);
if (!stream)
{
ref = NULL;
goto end;
}
lock.l_start = 0;
lock.l_len = 0;
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &lock) < 0)
{
StreamClose(stream);
ref = NULL;
goto end;
}
ref = Malloc(sizeof(*ref));
DbRefInit(d, (DbRef *) ref);
/* TODO: Hints */
ref->base.hint = hint;
ref->base.ts = UtilLastModified(path);
ref->base.json = JsonDecode(stream);
ref->stream = stream;
ref->fd = fd;
if (!ref->base.json)
{
Free(ref);
StreamClose(stream);
ref = NULL;
goto end;
}
ref->base.name = ArrayCreate();
for (i = 0; i < ArraySize(dir); i++)
{
StringArrayAppend(ref->base.name, ArrayGet(dir, i));
}
}
end:
Free(path);
if (!ref)
{
pthread_mutex_unlock(&d->lock);
}
return (DbRef *) ref;
}
static bool
FlatUnlock(Db *d, DbRef *r)
{
FlatDbRef *ref = (FlatDbRef *) r;
if (!d || !r)
{
return false;
}
lseek(ref->fd, 0L, SEEK_SET);
if (ftruncate(ref->fd, 0) < 0)
{
pthread_mutex_unlock(&d->lock);
Log(LOG_ERR, "Failed to truncate file on disk.");
Log(LOG_ERR, "Error on fd %d: %s", ref->fd, strerror(errno));
return false;
}
JsonEncode(ref->base.json, ref->stream, JSON_DEFAULT);
StreamClose(ref->stream);
JsonFree(ref->base.json);
StringArrayFree(ref->base.name);
Free(ref);
pthread_mutex_unlock(&d->lock);
return true;
}
static DbRef *
FlatCreate(Db *d, Array *dir)
{
FlatDb *db = (FlatDb *) d;
char *path, *dirPath;
Stream *fp;
DbRef *ret;
if (!d || !dir)
{
return NULL;
}
pthread_mutex_lock(&d->lock);
path = DbFileName(db, dir);
if (UtilLastModified(path))
{
Free(path);
pthread_mutex_unlock(&d->lock);
return NULL;
}
dirPath = DbDirName(db, dir, 1);
if (UtilMkdir(dirPath, 0750) < 0)
{
Free(path);
Free(dirPath);
pthread_mutex_unlock(&d->lock);
return NULL;
}
Free(dirPath);
fp = StreamOpen(path, "w");
Free(path);
if (!fp)
{
pthread_mutex_unlock(&d->lock);
return NULL;
}
StreamPuts(fp, "{}");
StreamClose(fp);
/* FlatLock() will lock again for us */
pthread_mutex_unlock(&d->lock);
ret = FlatLock(d, DB_HINT_WRITE, dir);
return ret;
}
static bool
FlatDelete(Db *d, Array *dir)
{
bool ret = false;
char *file;
FlatDb *db = (FlatDb *) d;
if (!d || !dir)
{
return false;
}
pthread_mutex_lock(&d->lock);
file = DbFileName(db, dir);
/* TODO: Unlink the entry from the linkedlist */
if (UtilLastModified(file))
{
ret = remove(file) == 0;
}
Free(file);
pthread_mutex_unlock(&d->lock);
return ret;
}
static Array *
FlatList(Db *d, Array *dir)
{
FlatDb *db = (FlatDb *) d;
struct dirent *file;
Array *ret;
DIR *files;
char *path;
if (!d || !dir)
{
return NULL;
}
pthread_mutex_lock(&d->lock);
path = DbDirName(db, dir, 0);
files = opendir(path);
if (!files)
{
Free(path);
pthread_mutex_unlock(&d->lock);
return NULL;
}
ret = ArrayCreate();
while ((file = readdir(files)))
{
size_t namlen = strlen(file->d_name);
if (namlen > 5)
{
int nameOffset = namlen - 5;
if (StrEquals(file->d_name + nameOffset, ".json"))
{
file->d_name[nameOffset] = '\0';
StringArrayAppend(ret, file->d_name);
}
}
}
closedir(files);
Free(path);
pthread_mutex_unlock(&d->lock);
return ret;
}
static bool
FlatExists(Db *d, Array *dir)
{
FlatDb *db = (FlatDb *) d;
char *path;
bool ret;
if (!d || !dir)
{
return NULL;
}
pthread_mutex_lock(&d->lock);
path = DbFileName(db, dir);
ret = UtilLastModified(path) != 0;
Free(path);
pthread_mutex_unlock(&d->lock);
return ret;
}
Db *
DbOpen(char *dir, size_t cache)
{
FlatDb *db;
if (!dir)
{
return NULL;
}
db = Malloc(sizeof(*db));
DbInit((Db *) db);
db->dir = dir;
db->base.cacheSize = cache;
db->base.lockFunc = FlatLock;
db->base.unlock = FlatUnlock;
db->base.create = FlatCreate;
db->base.delete = FlatDelete;
db->base.exists = FlatExists;
db->base.list = FlatList;
db->base.close = NULL;
return (Db *) db;
}

101
src/Db/Internal.h Normal file
View file

@ -0,0 +1,101 @@
/*
* Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef CYTOPLASM_DB_INTERNAL_H
#define CYTOPLASM_DB_INTERNAL_H
#include <HashMap.h>
#include <Db.h>
#include <pthread.h>
/* TODO: Define the base structures to define a database internally.
* All "implementations" shall have them as a first element, so that
* basic, general functions can work on them properly. */
/* The base structure of a database */
struct Db
{
pthread_mutex_t lock;
size_t cacheSize;
size_t maxCache;
HashMap *cache;
/*
* The cache uses a double linked list (see DbRef
* below) to know which objects are most and least
* recently used. The following diagram helps me
* know what way all the pointers go, because it
* can get very confusing sometimes. For example,
* there's nothing stopping "next" from pointing to
* least recent, and "prev" from pointing to most
* recent, so hopefully this clarifies the pointer
* terminology used when dealing with the linked
* list:
*
* mostRecent leastRecent
* | prev prev | prev
* +---+ ---> +---+ ---> +---+ ---> NULL
* |ref| |ref| |ref|
* NULL <--- +---+ <--- +---+ <--- +---+
* next next next
*/
DbRef *mostRecent;
DbRef *leastRecent;
/* Functions for implementation-specific operations
* (opening a ref, closing a db, removing an entry, ...) */
DbRef * (*lockFunc)(Db *, DbHint, Array *);
DbRef * (*create)(Db *, Array *);
Array * (*list)(Db *, Array *);
bool (*unlock)(Db *, DbRef *);
bool (*delete)(Db *, Array *);
bool (*exists)(Db *, Array *);
void (*close)(Db *);
/* Implementation-specific constructs */
};
struct DbRef
{
HashMap *json;
uint64_t ts;
size_t size;
Array *name;
DbRef *prev;
DbRef *next;
DbHint hint;
/* Implementation-specific constructs */
};
extern void DbInit(Db *);
extern void DbRefInit(Db *, DbRef *);
extern void StringArrayFree(Array *);
extern void StringArrayAppend(Array *, char *);
#endif

590
src/Db/LMDB.c Normal file
View file

@ -0,0 +1,590 @@
#include <Memory.h>
#include <Json.h>
#include <Log.h>
#include <Db.h>
#include "Db/Internal.h"
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#ifdef EDB_LMDB
#include <lmdb.h>
typedef struct LMDB {
Db base; /* The base implementation required to pass */
MDB_env *environ;
MDB_dbi dbi;
} LMDB;
typedef struct LMDBRef {
DbRef base;
/* TODO: LMDB-dependent stuff */
MDB_txn *transaction;
MDB_dbi dbi;
} LMDBRef;
/* Helper functions */
static MDB_val
LMDBTranslateKey(Array *key)
{
MDB_val ret = { 0 };
char *data = NULL;
size_t length = 0;
size_t i;
if (!key || ArraySize(key) > 255)
{
return ret;
}
data = Realloc(data, ++length);
data[0] = ArraySize(key);
/* Now, let's push every item */
for (i = 0; i < ArraySize(key); i++)
{
char *entry = ArrayGet(key, i);
size_t offset = length;
data = Realloc(data, (length += strlen(entry) + 1));
memcpy(data + offset, entry, strlen(entry));
data[length - 1] = '\0';
}
/* We now have every key */
ret.mv_size = length;
ret.mv_data = data;
return ret;
}
static void
LMDBKillKey(MDB_val key)
{
if (!key.mv_data || !key.mv_size)
{
return;
}
Free(key.mv_data);
}
static HashMap *
LMDBDecode(MDB_val val)
{
FILE *fakefile;
Stream *fakestream;
HashMap *ret;
if (!val.mv_data || !val.mv_size)
{
return NULL;
}
fakefile = fmemopen(val.mv_data, val.mv_size, "r");
fakestream = StreamFile(fakefile);
ret = JsonDecode(fakestream);
StreamClose(fakestream);
return ret;
}
static bool
LMDBKeyStartsWith(MDB_val key, MDB_val starts)
{
size_t i;
if (!key.mv_size || !starts.mv_size)
{
return false;
}
if (key.mv_size < starts.mv_size)
{
return false;
}
for (i = 0; i < starts.mv_size; i++)
{
char keyC = ((char *) key.mv_data)[i];
char startC = ((char *) starts.mv_data)[i];
if (keyC != startC)
{
return false;
}
}
return true;
}
static char *
LMDBKeyHead(MDB_val key)
{
char *end;
if (!key.mv_size || !key.mv_data)
{
return NULL;
}
/* -2 because we have a NULL byte in there */
end = ((char *) key.mv_data) + key.mv_size - 1;
if ((void *) end < key.mv_data)
{
/* Uh oh. */
return NULL;
}
while ((void *) (end - 1) >= key.mv_data && *(end - 1))
{
end--;
}
return end;
}
static DbRef *
LMDBLock(Db *d, DbHint hint, Array *k)
{
LMDB *db = (LMDB *) d;
MDB_txn *transaction;
LMDBRef *ret = NULL;
MDB_val key, json_val;
int code, flags;
if (!d || !k)
{
return NULL;
}
pthread_mutex_lock(&d->lock);
key = LMDBTranslateKey(k);
/* Create a transaction, honoring hints. */
/* TODO: Do we want to create a "main" transaction that everyone inherits
* from? */
flags = hint == DB_HINT_READONLY ? MDB_RDONLY : 0;
if ((code = mdb_txn_begin(db->environ, NULL, flags, &transaction)) != 0)
{
/* Very bad! */
Log(LOG_ERR,
"%s: could not begin transaction: %s",
__func__, mdb_strerror(code)
);
goto end;
}
json_val.mv_size = 0;
json_val.mv_data = NULL;
code = mdb_get(transaction, db->dbi, &key, &json_val);
if (code == MDB_NOTFOUND)
{
mdb_txn_abort(transaction);
goto end;
}
else if (code != 0)
{
Log(LOG_ERR,
"%s: mdb_get failure: %s",
__func__, mdb_strerror(code)
);
mdb_txn_abort(transaction);
goto end;
}
ret = Malloc(sizeof(*ret));
DbRefInit(d, (DbRef *) ret);
/* TODO: Timestamp */
{
size_t i;
ret->base.name = ArrayCreate();
for (i = 0; i < ArraySize(k); i++)
{
char *ent = ArrayGet(k, i);
StringArrayAppend(ret->base.name, ent);
}
}
ret->base.json = LMDBDecode(json_val);
ret->base.hint = hint;
ret->transaction = NULL;
if (hint == DB_HINT_WRITE)
{
ret->transaction = transaction;
}
else
{
mdb_txn_abort(transaction);
}
end:
if (!ret || hint == DB_HINT_READONLY)
{
pthread_mutex_unlock(&d->lock);
}
LMDBKillKey(key);
return (DbRef *) ret;
}
static bool
LMDBExists(Db *d, Array *k)
{
MDB_val key, empty;
LMDB *db = (LMDB *) d;
MDB_txn *transaction;
int code;
bool ret = false;
if (!d || !k)
{
return false;
}
pthread_mutex_lock(&d->lock);
key = LMDBTranslateKey(k);
/* create a txn */
if ((code = mdb_txn_begin(db->environ, NULL, 0, &transaction)) != 0)
{
/* Very bad! */
Log(LOG_ERR,
"%s: could not begin transaction: %s",
__func__, mdb_strerror(code)
);
goto end;
}
ret = mdb_get(transaction, db->dbi, &key, &empty) == 0;
mdb_txn_abort(transaction);
end:
LMDBKillKey(key);
pthread_mutex_unlock(&d->lock);
return ret;
}
static bool
LMDBDelete(Db *d, Array *k)
{
MDB_val key, empty;
LMDB *db = (LMDB *) d;
MDB_txn *transaction;
int code;
bool ret = false;
if (!d || !k)
{
return false;
}
pthread_mutex_lock(&d->lock);
key = LMDBTranslateKey(k);
/* create a txn */
if ((code = mdb_txn_begin(db->environ, NULL, 0, &transaction)) != 0)
{
/* Very bad! */
Log(LOG_ERR,
"%s: could not begin transaction: %s",
__func__, mdb_strerror(code)
);
goto end;
}
ret = mdb_del(transaction, db->dbi, &key, &empty) == 0;
mdb_txn_commit(transaction);
end:
LMDBKillKey(key);
pthread_mutex_unlock(&d->lock);
return ret;
}
static bool
LMDBUnlock(Db *d, DbRef *r)
{
LMDBRef *ref = (LMDBRef *) r;
LMDB *db = (LMDB *) d;
FILE *fakestream;
Stream *stream;
MDB_val key, val;
bool ret = true;
if (!d || !r)
{
return false;
}
val.mv_data = NULL;
val.mv_size = 0;
if (ref->transaction && r->hint == DB_HINT_WRITE)
{
key = LMDBTranslateKey(r->name);
fakestream = open_memstream((char **) &val.mv_data, &val.mv_size);
stream = StreamFile(fakestream);
JsonEncode(r->json, stream, JSON_DEFAULT);
StreamFlush(stream);
StreamClose(stream);
ret = mdb_put(ref->transaction, db->dbi, &key, &val, 0) == 0;
mdb_txn_commit(ref->transaction);
LMDBKillKey(key);
}
StringArrayFree(ref->base.name);
JsonFree(ref->base.json);
Free(ref);
if (val.mv_data)
{
free(val.mv_data);
}
if (ret && r->hint == DB_HINT_WRITE)
{
pthread_mutex_unlock(&d->lock);
}
return ret;
}
static DbRef *
LMDBCreate(Db *d, Array *k)
{
LMDB *db = (LMDB *) d;
MDB_txn *transaction;
LMDBRef *ret = NULL;
MDB_val key, empty_json;
int code;
if (!d || !k)
{
return NULL;
}
pthread_mutex_lock(&d->lock);
key = LMDBTranslateKey(k);
/* create a txn */
if ((code = mdb_txn_begin(db->environ, NULL, 0, &transaction)) != 0)
{
/* Very bad! */
Log(LOG_ERR,
"%s: could not begin transaction: %s",
__func__, mdb_strerror(code)
);
goto end;
}
empty_json.mv_size = 2;
empty_json.mv_data = "{}";
/* put data in it */
code = mdb_put(transaction, db->dbi, &key, &empty_json, MDB_NOOVERWRITE);
if (code == MDB_KEYEXIST)
{
mdb_txn_abort(transaction);
goto end;
}
else if (code == MDB_MAP_FULL)
{
Log(LOG_ERR, "%s: db is full", __func__);
mdb_txn_abort(transaction);
goto end;
}
else if (code != 0)
{
Log(LOG_ERR, "%s: mdb_put failure: %s", __func__, mdb_strerror(code));
mdb_txn_abort(transaction);
goto end;
}
ret = Malloc(sizeof(*ret));
DbRefInit(d, (DbRef *) ret);
/* TODO: Timestamp */
{
size_t i;
ret->base.name = ArrayCreate();
for (i = 0; i < ArraySize(k); i++)
{
char *ent = ArrayGet(k, i);
StringArrayAppend(ret->base.name, ent);
}
}
ret->base.hint = DB_HINT_WRITE;
ret->base.json = HashMapCreate();
ret->transaction = transaction;
end:
if (!ret)
{
pthread_mutex_unlock(&d->lock);
}
LMDBKillKey(key);
return (DbRef *) ret;
}
static Array *
LMDBList(Db *d, Array *k)
{
LMDB *db = (LMDB *) d;
MDB_val key = { 0 };
MDB_val subKey;
MDB_val val;
Array *ret = NULL;
MDB_cursor *cursor;
MDB_cursor_op op = MDB_SET_RANGE;
MDB_txn *txn;
int code;
if (!d || !k)
{
return NULL;
}
pthread_mutex_lock(&d->lock);
/* Marked as read-only, as we just don't need to write anything
* when listing */
if ((code = mdb_txn_begin(db->environ, NULL, MDB_RDONLY, &txn)) != 0)
{
/* Very bad! */
Log(LOG_ERR,
"%s: could not begin transaction: %s",
__func__, mdb_strerror(code)
);
goto end;
}
if ((code = mdb_cursor_open(txn, db->dbi, &cursor)) != 0)
{
Log(LOG_ERR,
"%s: could not get cursor: %s",
__func__, mdb_strerror(code)
);
mdb_txn_abort(txn);
goto end;
}
key = LMDBTranslateKey(k);
/* Small hack to get it to list subitems */
((char *) key.mv_data)[0]++;
ret = ArrayCreate();
subKey = key;
while (mdb_cursor_get(cursor, &subKey, &val, op) == 0)
{
/* This searches by *increasing* order. The problem is that it may
* extend to unwanted points. Since the values are sorted, we can
* just exit if the subkey is incorrect. */
char *head = LMDBKeyHead(subKey);
if (!LMDBKeyStartsWith(subKey, key))
{
break;
}
StringArrayAppend(ret, head);
op = MDB_NEXT;
}
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
end:
LMDBKillKey(key);
pthread_mutex_unlock(&d->lock);
return ret;
}
/* Implementation functions */
static void
LMDBClose(Db *d)
{
LMDB *db = (LMDB *) d;
if (!d)
{
return;
}
mdb_dbi_close(db->environ, db->dbi);
mdb_env_close(db->environ);
}
Db *
DbOpenLMDB(char *dir, size_t size)
{
/* TODO */
MDB_env *env = NULL;
MDB_txn *txn;
MDB_dbi dbi;
int code;
LMDB *db;
if (!dir || !size)
{
return NULL;
}
/* Try initialising LMDB */
if ((code = mdb_env_create(&env)) != 0)
{
Log(LOG_ERR,
"%s: could not create LMDB env: %s",
__func__, mdb_strerror(code)
);
return NULL;
}
if ((code = mdb_env_set_mapsize(env, size) != 0))
{
Log(LOG_ERR,
"%s: could not set mapsize to %lu: %s",
__func__, (unsigned long) size,
mdb_strerror(code)
);
mdb_env_close(env);
return NULL;
}
mdb_env_set_maxdbs(env, 4);
if ((code = mdb_env_open(env, dir, MDB_NOTLS, 0644)) != 0)
{
Log(LOG_ERR,
"%s: could not open LMDB env: %s",
__func__, mdb_strerror(code)
);
mdb_env_close(env);
return NULL;
}
/* Initialise a DBI */
{
if ((code = mdb_txn_begin(env, NULL, 0, &txn)) != 0)
{
Log(LOG_ERR,
"%s: could not begin transaction: %s",
__func__, mdb_strerror(code)
);
mdb_env_close(env);
return NULL;
}
if ((code = mdb_dbi_open(txn, "db", MDB_CREATE, &dbi)) != 0)
{
Log(LOG_ERR,
"%s: could not get transaction dbi: %s",
__func__, mdb_strerror(code)
);
mdb_txn_abort(txn);
mdb_env_close(env);
return NULL;
}
mdb_txn_commit(txn);
}
db = Malloc(sizeof(*db));
DbInit((Db *) db);
db->environ = env;
db->dbi = dbi;
db->base.lockFunc = LMDBLock;
db->base.create = LMDBCreate;
db->base.unlock = LMDBUnlock;
db->base.delete = LMDBDelete;
db->base.exists = LMDBExists;
db->base.close = LMDBClose;
db->base.list = LMDBList;
return (Db *) db;
}
#else
Db *
DbOpenLMDB(char *dir, size_t size)
{
/* Unimplemented function */
Log(LOG_ERR,
"LMDB support is not enabled. Please compile with --use-lmdb"
);
(void) size;
(void) dir;
return NULL;
}
#endif