Migrate disk cache to verstable

This commit is contained in:
Kovid Goyal
2024-07-10 20:01:24 +05:30
parent b53264c099
commit 53eff7927a

View File

@@ -9,7 +9,6 @@
#include "disk-cache.h"
#include "safe-wrappers.h"
#include "kitty-uthash.h"
#include "simd-string.h"
#include "loop-utils.h"
#include "fast-file-copy.h"
@@ -21,17 +20,34 @@
#include <fcntl.h>
#include <time.h>
typedef struct CacheKey {
void *hash_key;
unsigned short hash_keylen;
} CacheKey;
typedef struct {
void *hash_key;
uint8_t *data;
size_t data_sz;
unsigned short hash_keylen;
bool written_to_disk;
off_t pos_in_cache_file;
uint8_t encryption_key[64];
UT_hash_handle hh;
} CacheEntry;
} CacheValue;
#define NAME cache_map
#define KEY_TY CacheKey
#define VAL_TY CacheValue*
static uint64_t key_hash(KEY_TY k);
#define HASH_FN key_hash
static bool keys_are_equal(KEY_TY a, KEY_TY b);
#define CMPR_FN keys_are_equal
static void free_cache_value(CacheValue *cv) { free(cv->data); cv->data = NULL; free(cv); }
static void free_cache_key(CacheKey cv) { free(cv.hash_key); cv.hash_key = NULL; }
#define KEY_DTOR_FN free_cache_key
#define VAL_DTOR_FN free_cache_value
#include "kitty-verstable.h"
static uint64_t key_hash(CacheKey k) { return vt_hash_bytes(k.hash_key, k.hash_keylen); }
static bool keys_are_equal(CacheKey a, CacheKey b) { return a.hash_keylen == b.hash_keylen && memcmp(a.hash_key, b.hash_key, a.hash_keylen) == 0; }
typedef struct Hole {
off_t pos, size;
@@ -52,18 +68,12 @@ typedef struct {
pthread_t write_thread;
bool thread_started, lock_inited, loop_data_inited, shutting_down, fully_initialized;
LoopData loop_data;
CacheEntry *entries, currently_writing;
struct { CacheValue val; CacheKey key; } currently_writing;
cache_map map;
Holes holes;
unsigned long long total_size;
} DiskCache;
void
free_cache_entry(CacheEntry *e) {
if (e->hash_key) { free(e->hash_key); e->hash_key = NULL; }
if (e->data) { free(e->data); e->data = NULL; }
free(e);
}
#define mutex(op) pthread_mutex_##op(&self->lock)
static PyObject*
@@ -125,12 +135,18 @@ disk_cache_size_on_disk(PyObject *self) {
}
typedef struct {
uint8_t hash_key[MAX_KEY_SIZE];
unsigned short hash_keylen;
CacheKey key;
off_t old_offset, new_offset;
size_t data_sz;
} DefragEntry;
static CacheKey
keydup(CacheKey k) {
CacheKey ans = {.hash_key=malloc(k.hash_keylen), .hash_keylen=k.hash_keylen};
if (ans.hash_key) memcpy(ans.hash_key, k.hash_key, k.hash_keylen);
return ans;
}
static void
defrag(DiskCache *self) {
int new_cache_file = -1;
@@ -140,7 +156,7 @@ defrag(DiskCache *self) {
off_t size_on_disk = size_of_cache_file(self);
if (size_on_disk <= 0) goto cleanup;
size_t num_entries = HASH_COUNT(self->entries);
size_t num_entries = vt_size(&self->map);
if (!num_entries) goto cleanup;
new_cache_file = open_cache_file(self->cache_dir);
if (new_cache_file < 0) {
@@ -150,15 +166,15 @@ defrag(DiskCache *self) {
defrag_entries = calloc(num_entries, sizeof(DefragEntry));
if (!defrag_entries) goto cleanup;
size_t total_data_size = 0, num_entries_to_defrag = 0;
CacheEntry *tmp, *s;
HASH_ITER(hh, self->entries, s, tmp) {
for (cache_map_itr i = vt_first(&self->map); !vt_is_end(i); i = vt_next(i)) {
CacheValue *s = i.data->val;
if (s->pos_in_cache_file > -1 && s->data_sz) {
total_data_size += s->data_sz;
DefragEntry *e = defrag_entries + num_entries_to_defrag++;
e->hash_keylen = s->hash_keylen;
e->old_offset = s->pos_in_cache_file;
e->data_sz = s->data_sz;
if (s->hash_key) memcpy(e->hash_key, s->hash_key, s->hash_keylen);
e->key = keydup(i.data->key); // have to dup the key as we release the mutex and another thread might free the underlying key.
if (!e->key.hash_key) { fprintf(stderr, "Failed to allocate space for keydup in defrag\n"); goto cleanup; }
}
}
if (ftruncate(new_cache_file, total_data_size) != 0) {
@@ -190,9 +206,9 @@ cleanup:
self->cache_file_fd = new_cache_file; new_cache_file = -1;
for (size_t i = 0; i < num_entries_to_defrag; i++) {
DefragEntry *e = defrag_entries + i;
s = NULL;
HASH_FIND(hh, self->entries, e->hash_key, e->hash_keylen, s);
if (s) s->pos_in_cache_file = e->new_offset;
cache_map_itr i = vt_get(&self->map, e->key);
if (!vt_is_end(i)) i.data->val->pos_in_cache_file = e->new_offset;
free(e->key.hash_key);
}
}
if (new_cache_file > -1) safe_close(new_cache_file, __FILE__, __LINE__);
@@ -208,7 +224,7 @@ find_hole_to_use(DiskCache *self, const off_t required_sz) {
Hole *h = self->holes.items + i;
if (!found && h->size >= required_sz) {
found = true;
self->currently_writing.pos_in_cache_file = h->pos;
self->currently_writing.val.pos_in_cache_file = h->pos;
bool was_largest_hole = h->size == self->holes.largest_hole_size;
h->size -= required_sz;
h->pos += required_sz;
@@ -255,7 +271,7 @@ add_hole(DiskCache *self, off_t pos, off_t size) {
}
static void
remove_from_disk(DiskCache *self, CacheEntry *s) {
remove_from_disk(DiskCache *self, CacheValue *s) {
if (s->written_to_disk) {
s->written_to_disk = false;
if (s->data_sz && s->pos_in_cache_file > -1) {
@@ -267,20 +283,20 @@ remove_from_disk(DiskCache *self, CacheEntry *s) {
static bool
find_cache_entry_to_write(DiskCache *self) {
CacheEntry *tmp, *s;
if (needs_defrag(self)) defrag(self);
HASH_ITER(hh, self->entries, s, tmp) {
for (cache_map_itr i = vt_first(&self->map); !vt_is_end(i); i = vt_next(i)) {
CacheValue *s = i.data->val;
if (!s->written_to_disk) {
if (s->data) {
if (self->currently_writing.data) free(self->currently_writing.data);
self->currently_writing.data = s->data;
if (self->currently_writing.val.data) free(self->currently_writing.val.data);
self->currently_writing.val.data = s->data;
s->data = NULL;
self->currently_writing.data_sz = s->data_sz;
self->currently_writing.pos_in_cache_file = -1;
xor_data64(s->encryption_key, self->currently_writing.data, s->data_sz);
self->currently_writing.hash_keylen = MIN(s->hash_keylen, MAX_KEY_SIZE);
memcpy(self->currently_writing.hash_key, s->hash_key, self->currently_writing.hash_keylen);
find_hole_to_use(self, self->currently_writing.data_sz);
self->currently_writing.val.data_sz = s->data_sz;
self->currently_writing.val.pos_in_cache_file = -1;
xor_data64(s->encryption_key, self->currently_writing.val.data, s->data_sz);
self->currently_writing.key.hash_keylen = MIN(i.data->key.hash_keylen, MAX_KEY_SIZE);
memcpy(self->currently_writing.key.hash_key, i.data->key.hash_key, self->currently_writing.key.hash_keylen);
find_hole_to_use(self, self->currently_writing.val.data_sz);
return true;
}
s->written_to_disk = true;
@@ -293,27 +309,27 @@ find_cache_entry_to_write(DiskCache *self) {
static bool
write_dirty_entry(DiskCache *self) {
size_t left = self->currently_writing.data_sz;
uint8_t *p = self->currently_writing.data;
if (self->currently_writing.pos_in_cache_file < 0) {
self->currently_writing.pos_in_cache_file = size_of_cache_file(self);
if (self->currently_writing.pos_in_cache_file < 0) {
size_t left = self->currently_writing.val.data_sz;
uint8_t *p = self->currently_writing.val.data;
if (self->currently_writing.val.pos_in_cache_file < 0) {
self->currently_writing.val.pos_in_cache_file = size_of_cache_file(self);
if (self->currently_writing.val.pos_in_cache_file < 0) {
perror("Failed to seek in disk cache file");
return false;
}
}
off_t offset = self->currently_writing.pos_in_cache_file;
off_t offset = self->currently_writing.val.pos_in_cache_file;
while (left > 0) {
ssize_t n = pwrite(self->cache_file_fd, p, left, offset);
if (n < 0) {
if (errno == EINTR || errno == EAGAIN) continue;
perror("Failed to write to disk-cache file");
self->currently_writing.pos_in_cache_file = -1;
self->currently_writing.val.pos_in_cache_file = -1;
return false;
}
if (n == 0) {
fprintf(stderr, "Failed to write to disk-cache file with zero return\n");
self->currently_writing.pos_in_cache_file = -1;
self->currently_writing.val.pos_in_cache_file = -1;
return false;
}
left -= n;
@@ -325,15 +341,14 @@ write_dirty_entry(DiskCache *self) {
static void
retire_currently_writing(DiskCache *self) {
CacheEntry *s = NULL;
HASH_FIND(hh, self->entries, self->currently_writing.hash_key, self->currently_writing.hash_keylen, s);
if (s) {
s->written_to_disk = true;
s->pos_in_cache_file = self->currently_writing.pos_in_cache_file;
cache_map_itr i = vt_get(&self->map, self->currently_writing.key);
if (!vt_is_end(i)) {
i.data->val->written_to_disk = true;
i.data->val->pos_in_cache_file = self->currently_writing.val.pos_in_cache_file;
}
free(self->currently_writing.data);
self->currently_writing.data = NULL;
self->currently_writing.data_sz = 0;
free(self->currently_writing.val.data);
self->currently_writing.val.data = NULL;
self->currently_writing.val.data_sz = 0;
}
static void*
@@ -348,7 +363,7 @@ write_loop(void *data) {
while (!self->shutting_down) {
mutex(lock);
found_dirty_entry = find_cache_entry_to_write(self);
size_t count = HASH_COUNT(self->entries);
size_t count = vt_size(&self->map);
mutex(unlock);
if (found_dirty_entry) {
write_dirty_entry(self);
@@ -380,9 +395,9 @@ ensure_state(DiskCache *self) {
if (!init_loop_data(&self->loop_data, 0)) { PyErr_SetFromErrno(PyExc_OSError); return false; }
self->loop_data_inited = true;
}
if (!self->currently_writing.hash_key) {
self->currently_writing.hash_key = malloc(MAX_KEY_SIZE);
if (!self->currently_writing.hash_key) { PyErr_NoMemory(); return false; }
if (!self->currently_writing.key.hash_key) {
self->currently_writing.key.hash_key = malloc(MAX_KEY_SIZE);
if (!self->currently_writing.key.hash_key) { PyErr_NoMemory(); return false; }
}
if (!self->lock_inited) {
@@ -424,7 +439,7 @@ ensure_state(DiskCache *self) {
return false;
}
}
vt_init(&self->map);
self->fully_initialized = true;
return true;
}
@@ -442,8 +457,8 @@ dealloc(DiskCache* self) {
pthread_join(self->write_thread, NULL);
self->thread_started = false;
}
if (self->currently_writing.hash_key) {
free(self->currently_writing.hash_key); self->currently_writing.hash_key = NULL;
if (self->currently_writing.key.hash_key) {
free(self->currently_writing.key.hash_key); self->currently_writing.key.hash_key = NULL;
}
if (self->lock_inited) {
pthread_mutex_destroy(&self->lock);
@@ -453,33 +468,22 @@ dealloc(DiskCache* self) {
free_loop_data(&self->loop_data);
self->loop_data_inited = false;
}
if (self->entries) {
CacheEntry *tmp, *s;
HASH_ITER(hh, self->entries, s, tmp) {
HASH_DEL(self->entries, s);
free_cache_entry(s); s = NULL;
}
self->entries = NULL;
}
vt_cleanup(&self->map);
if (self->cache_file_fd > -1) {
safe_close(self->cache_file_fd, __FILE__, __LINE__);
self->cache_file_fd = -1;
}
free(self->holes.items); zero_at_ptr(&self->holes);
if (self->currently_writing.data) free(self->currently_writing.data);
if (self->currently_writing.val.data) free(self->currently_writing.val.data);
free(self->cache_dir); self->cache_dir = NULL;
Py_TYPE(self)->tp_free((PyObject*)self);
}
static CacheEntry*
create_cache_entry(const void *key, const size_t key_sz) {
CacheEntry *s = calloc(1, sizeof(CacheEntry));
if (!s) return (CacheEntry*)PyErr_NoMemory();
static CacheValue*
create_cache_entry(void) {
CacheValue *s = calloc(1, sizeof(CacheValue));
if (!s) return (CacheValue*)PyErr_NoMemory();
if (!secure_random_bytes(s->encryption_key, sizeof(s->encryption_key))) { free(s); PyErr_SetFromErrno(PyExc_OSError); return NULL; }
s->hash_key = malloc(key_sz);
if (!s->hash_key) { free(s); PyErr_NoMemory(); return NULL; }
s->hash_keylen = key_sz;
memcpy(s->hash_key, key, key_sz);
s->pos_in_cache_file = -2;
return s;
}
@@ -489,17 +493,22 @@ add_to_disk_cache(PyObject *self_, const void *key, size_t key_sz, const void *d
DiskCache *self = (DiskCache*)self_;
if (!ensure_state(self)) return false;
if (key_sz > MAX_KEY_SIZE) { PyErr_SetString(PyExc_KeyError, "cache key is too long"); return false; }
CacheEntry *s = NULL;
RAII_ALLOC(uint8_t, copied_data, malloc(data_sz));
if (!copied_data) { PyErr_NoMemory(); return false; }
memcpy(copied_data, data, data_sz);
CacheKey k = {.hash_key=(void*)key, .hash_keylen=key_sz};
mutex(lock);
HASH_FIND(hh, self->entries, key, key_sz, s);
if (s == NULL) {
if (!(s = create_cache_entry(key, key_sz))) goto end;
HASH_ADD_KEYPTR(hh, self->entries, s->hash_key, s->hash_keylen, s);
cache_map_itr i = vt_get(&self->map, k);
CacheValue *s;
if (vt_is_end(i)) {
k.hash_key = malloc(key_sz);
if (!k.hash_key) { PyErr_NoMemory(); goto end; }
memcpy(k.hash_key, key, key_sz);
if (!(s = create_cache_entry())) goto end;
if (vt_is_end(vt_insert(&self->map, k, s))) { PyErr_NoMemory(); goto end; }
} else {
s = i.data->val;
remove_from_disk(self, s);
self->total_size -= MIN(self->total_size, s->data_sz);
if (s->data) free(s->data);
@@ -518,17 +527,18 @@ remove_from_disk_cache(PyObject *self_, const void *key, size_t key_sz) {
DiskCache *self = (DiskCache*)self_;
if (!ensure_state(self)) return false;
if (key_sz > MAX_KEY_SIZE) { PyErr_SetString(PyExc_KeyError, "cache key is too long"); return false; }
CacheEntry *s = NULL;
CacheValue *s = NULL;
CacheKey k = {.hash_key=(void*)key, .hash_keylen=key_sz};
bool removed = false;
mutex(lock);
HASH_FIND(hh, self->entries, key, key_sz, s);
if (s) {
cache_map_itr i = vt_get(&self->map, k);
if (!vt_is_end(i)) {
removed = true;
HASH_DEL(self->entries, s);
s = i.data->val;
remove_from_disk(self, s);
self->total_size = (self->total_size > s->data_sz) ? self->total_size - s->data_sz : 0;
free_cache_entry(s);
vt_erase_itr(&self->map, i);
}
mutex(unlock);
wakeup_write_loop(self);
@@ -539,12 +549,8 @@ void
clear_disk_cache(PyObject *self_) {
DiskCache *self = (DiskCache*)self_;
if (!ensure_state(self)) return;
CacheEntry *s, *tmp;
mutex(lock);
HASH_ITER(hh, self->entries, s, tmp) {
HASH_DEL(self->entries, s);
free_cache_entry(s);
}
vt_cleanup(&self->map);
self->total_size = 0;
self->holes.count = 0;
self->holes.largest_hole_size = 0;
@@ -577,7 +583,7 @@ read_from_cache_file(const DiskCache *self, off_t pos, size_t sz, void *dest) {
}
static void
read_from_cache_entry(const DiskCache *self, const CacheEntry *s, void *dest) {
read_from_cache_entry(const DiskCache *self, const CacheValue *s, void *dest) {
size_t sz = s->data_sz;
off_t pos = s->pos_in_cache_file;
if (pos < 0) {
@@ -593,17 +599,18 @@ read_from_disk_cache(PyObject *self_, const void *key, size_t key_sz, void*(allo
void *data = NULL;
if (!ensure_state(self)) return data;
if (key_sz > MAX_KEY_SIZE) { PyErr_SetString(PyExc_KeyError, "cache key is too long"); return data; }
CacheKey k = {.hash_key=(void*)key, .hash_keylen=key_sz};
mutex(lock);
CacheEntry *s = NULL;
HASH_FIND(hh, self->entries, key, key_sz, s);
if (!s) { PyErr_SetString(PyExc_KeyError, "No cached entry with specified key found"); goto end; }
cache_map_itr i = vt_get(&self->map, k);
if (vt_is_end(i)) { PyErr_SetString(PyExc_KeyError, "No cached entry with specified key found"); goto end; }
CacheValue *s = i.data->val;
data = allocator(allocator_data, s->data_sz);
if (!data) { PyErr_NoMemory(); goto end; }
if (s->data) { memcpy(data, s->data, s->data_sz); }
else if (self->currently_writing.data && self->currently_writing.hash_key && self->currently_writing.hash_keylen == key_sz && memcmp(self->currently_writing.hash_key, key, key_sz) == 0) {
memcpy(data, self->currently_writing.data, s->data_sz);
else if (self->currently_writing.val.data && self->currently_writing.key.hash_key && keys_are_equal(self->currently_writing.key, k)) {
memcpy(data, self->currently_writing.val.data, s->data_sz);
xor_data64(s->encryption_key, data, s->data_sz);
}
else {
@@ -627,9 +634,9 @@ disk_cache_clear_from_ram(PyObject *self_, bool(matches)(void*, void *key, unsig
size_t ans = 0;
if (!ensure_state(self)) return ans;
mutex(lock);
CacheEntry *s, *tmp;
HASH_ITER(hh, self->entries, s, tmp) {
if (s->written_to_disk && s->data && matches(data, s->hash_key, s->hash_keylen)) {
for (cache_map_itr i = vt_first(&self->map); !vt_is_end(i); i = vt_next(i)) {
CacheValue *s = i.data->val;
if (s->written_to_disk && s->data && matches(data, i.data->key.hash_key, i.data->key.hash_keylen)) {
free(s->data); s->data = NULL;
ans++;
}
@@ -646,9 +653,8 @@ disk_cache_wait_for_write(PyObject *self_, monotonic_t timeout) {
while (!timeout || monotonic() <= end_at) {
bool pending = false;
mutex(lock);
CacheEntry *s, *tmp;
HASH_ITER(hh, self->entries, s, tmp) {
if (!s->written_to_disk) {
for (cache_map_itr i = vt_first(&self->map); !vt_is_end(i); i = vt_next(i)) {
if (!i.data->val->written_to_disk) {
pending = true;
break;
}
@@ -671,9 +677,8 @@ disk_cache_num_cached_in_ram(PyObject *self_) {
unsigned long ans = 0;
if (ensure_state(self)) {
mutex(lock);
CacheEntry *tmp, *s;
HASH_ITER(hh, self->entries, s, tmp) {
if (s->written_to_disk && s->data) ans++;
for (cache_map_itr i = vt_first(&self->map); !vt_is_end(i); i = vt_next(i)) {
if (i.data->val->written_to_disk && i.data->val->data) ans++;
}
mutex(unlock);
}