vkdb
A time series database engine in C++.
Loading...
Searching...
No Matches
lsm_tree.h
1#ifndef STORAGE_LSM_TREE_H
2#define STORAGE_LSM_TREE_H
3
4#include <vkdb/concepts.h>
5#include <vkdb/sstable.h>
6#include <vkdb/mem_table.h>
7#include <vkdb/write_ahead_log.h>
8#include <vkdb/lru_cache.h>
9#include <vkdb/wal_lsm.h>
10
11#include <algorithm>
12#include <ranges>
13#include <deque>
14#include <set>
15#include <functional>
16
17namespace vkdb {
22using TimeSeriesKeyFilter = std::function<bool(const TimeSeriesKey&)>;
23
28static const TimeSeriesKeyFilter TRUE_TIME_SERIES_KEY_FILTER =
29 [](const TimeSeriesKey&) { return true; };
30
36template <ArithmeticNoCVRefQuals TValue>
37class LSMTree {
38public:
39 using key_type = TimeSeriesKey;
40 using mapped_type = std::optional<TValue>;
41 using value_type = std::pair<const key_type, mapped_type>;
42 using size_type = uint64_t;
43 using table_type = typename MemTable<TValue>::table_type;
44
45 static constexpr size_type LAYER_COUNT{8};
46
51 static constexpr size_type CACHE_CAPACITY{10'000};
52
59 explicit LSMTree(FilePath path) noexcept
60 : wal_{path}
61 , path_{std::move(path)}
62 , sstable_id_{0}
63 , ck_layers_{LAYER_COUNT}
64 , cache_{CACHE_CAPACITY} {
65 std::filesystem::create_directories(path_);
66 load_sstables();
67 }
68
73 LSMTree(LSMTree&&) noexcept = default;
74
79 LSMTree& operator=(LSMTree&&) noexcept = default;
80
85 LSMTree(const LSMTree&) = delete;
86
91 LSMTree& operator=(const LSMTree&) = delete;
92
97 ~LSMTree() noexcept = default;
98
108 void put(const key_type& key, const TValue& value, bool log = true) {
109 if (log) {
110 wal_.append({WALRecordType::PUT, {key, value}});
111 }
112 mem_table_.put(key, value);
113 dirty_table_[key] = true;
114 if (mem_table_.size() == MemTable<TValue>::C0_LAYER_SSTABLE_MAX_ENTRIES) {
115 flush();
116 }
117 }
118
128 void remove(const key_type& key, bool log = true) {
129 if (log) {
130 wal_.append({WALRecordType::REMOVE, {key, std::nullopt}});
131 }
132 mem_table_.put(key, std::nullopt);
133 dirty_table_[key] = true;
134 if (mem_table_.size() == MemTable<TValue>::C0_LAYER_SSTABLE_MAX_ENTRIES) {
135 flush();
136 }
137 }
138
145 [[nodiscard]] mapped_type get(const key_type& key) const noexcept {
146 if (!dirty_table_[key] && cache_.contains(key)) {
147 return cache_.get(key);
148 }
149
150 try {
151 return search_memtable(key);
152 } catch (const std::invalid_argument& e) {
153 const auto c0_value{iterative_layer_search(0, key)};
154 if (c0_value.has_value()) {
155 return c0_value;
156 }
157
158 for (size_type k{1}; k < LAYER_COUNT; ++k) {
159 const auto ck_value{binary_layer_search(k, key)};
160 if (ck_value.has_value()) {
161 return ck_value;
162 }
163 }
164
165 return std::nullopt;
166 }
167 }
168
179 [[nodiscard]] std::vector<value_type> getRange(
180 const key_type& start,
181 const key_type& end,
182 TimeSeriesKeyFilter&& filter
183 ) const {
184 table_type entry_table;
185 for (size_type k{LAYER_COUNT - 1}; k > 0; --k) {
186 binary_layer_search_range(k, start, end, filter, entry_table);
187 }
188 for (const auto& sstable : ck_layers_[0]) {
189 update_entries_with_sstable_range(
190 sstable, start, end, filter, entry_table
191 );
192 }
193 for (const auto& [key, value] : mem_table_.getRange(start, end)) {
194 update_entries_with_filtered_key(key, value, filter, entry_table);
195 }
196 return {entry_table.begin(), entry_table.end()};
197 }
198
203 void replayWAL() {
204 wal_.replay(*this);
205 }
206
212 void clear() noexcept {
213 for (const auto& ck_layer : ck_layers_) {
214 for (const auto& sstable : ck_layer) {
215 std::filesystem::remove(sstable.path());
216 std::filesystem::remove(sstable.metadataPath());
217 }
218 }
219 std::filesystem::remove(wal_.path());
220 mem_table_.clear();
221 ck_layers_ = CkLayers{LAYER_COUNT};
222 wal_.clear();
223 sstable_id_ = std::array<size_type, LAYER_COUNT>{0};
224 cache_.clear();
225 dirty_table_.clear();
226 }
227
233 [[nodiscard]] std::string str() const noexcept {
234 std::stringstream ss;
235 ss << mem_table_.str();
236 for (const auto& ck_layer : ck_layers_) {
237 for (const auto& sstable : ck_layer) {
238 ss << sstable.str();
239 }
240 }
241 return ss.str();
242 }
243
249 [[nodiscard]] size_type sstableCount() const noexcept {
250 size_type count{0};
251 for (const auto& ck_layer : ck_layers_) {
252 count += ck_layer.size();
253 }
254 return count;
255 }
256
265 [[nodiscard]] size_type sstableCount(size_type k) const {
266 if (k < 0 || k >= LAYER_COUNT) {
267 throw std::out_of_range{
268 "LSMTree::sstableCount(): Layer index out of range."
269 };
270 }
271 return ck_layers_[k].size();
272 }
273
280 [[nodiscard]] bool empty() const noexcept {
281 auto empty{mem_table_.empty()};
282 for (const auto& ck_layer : ck_layers_) {
283 empty = empty && ck_layer.empty();
284 if (!empty) {
285 break;
286 }
287 }
288 return empty;
289 }
290
291private:
296 using SSTables = std::deque<SSTable<TValue>>;
297
302 using CkLayer = SSTables;
303
308 using CkLayers = std::vector<CkLayer>;
309
314 using TimeWindow = TimeRange;
315
320 using Cache = LRUCache<key_type, TValue>;
321
326 using DirtyTable = std::unordered_map<key_type, bool>;
327
332 using TimeWindowToEntriesMap = std::map<TimeWindow, table_type, std::greater<>>;
333
347 static constexpr std::array<size_type, LAYER_COUNT> CK_LAYER_WINDOW_SIZE{
348 0,
349 86'400,
350 604'800,
351 2'592'000,
352 7'776'000,
353 15'552'000,
354 31'536'000,
355 94'608'000
356 };
357
362 static constexpr std::array<size_type, LAYER_COUNT> CK_LAYER_TABLE_COUNT{
363 32,
364 2'048,
365 1'024,
366 512,
367 256,
368 128,
369 64,
370 32
371 };
372
379 [[nodiscard]] FilePath get_next_file_path(size_type k) noexcept {
380 return path_ / ("sstable_l" + std::to_string(k) + "_"
381 + std::to_string(sstable_id_[k]++) + ".sst");
382 }
383
389 void load_sstables() {
390 std::array<std::set<FilePath>, LAYER_COUNT> sstable_files;
391 for (const auto& file : std::filesystem::directory_iterator(path_)) {
392 if (!file.is_regular_file() || file.path().extension() != ".sst") {
393 continue;
394 }
395 const auto l_pos{file.path().filename().string().find("_l")};
396 const auto layer_idx{
397 std::stoull(file.path().filename().string().substr(l_pos + 2))
398 };
399 sstable_files[layer_idx].insert(file.path());
400 const auto id{
401 std::stoull(file.path().filename().string().substr(l_pos + 4))
402 };
403 sstable_id_[layer_idx] = std::max(
404 sstable_id_[layer_idx],
405 static_cast<size_type>(id + 1)
406 );
407 }
408 for (size_type k{0}; k < LAYER_COUNT; ++k) {
409 for (const auto& sstable_file : sstable_files[k]) {
410 ck_layers_[k].emplace_back(std::move(sstable_file));
411 }
412 }
413 compact();
414 }
415
422 void validate_layer_index(size_type k) const {
423 if (k < 0 || k >= LAYER_COUNT) {
424 throw std::out_of_range{
425 "LSMTree::validate_layer_index(): Layer index out of range."
426 };
427 }
428 }
429
437 void reset_layer(size_type k) noexcept {
438 validate_layer_index(k);
439 ck_layers_[k].clear();
440 sstable_id_[k] = 0;
441 }
442
448 void flush() {
449 FilePath sstable_file_path{get_next_file_path(0)};
450 ck_layers_[0].emplace_back(
451 sstable_file_path,
452 std::move(mem_table_),
453 mem_table_.size()
454 );
455 compact();
456 mem_table_.clear();
457 wal_.clear();
458 }
459
468 void update_entries_with_filtered_key(
469 const key_type& key,
470 const mapped_type& value,
471 const TimeSeriesKeyFilter& filter,
472 table_type& entry_table
473 ) const noexcept {
474 if (!filter(key)) {
475 return;
476 }
477 if (!value.has_value()) {
478 entry_table.erase(key);
479 return;
480 }
481 entry_table[key] = value;
482 }
483
493 void update_entries_with_sstable_range(
494 const SSTable<TValue>& sstable,
495 const key_type& start,
496 const key_type& end,
497 const TimeSeriesKeyFilter& filter,
498 table_type& entry_table
499 ) const noexcept {
500 for (const auto& [key, value] : sstable.getRange(start, end)) {
501 update_entries_with_filtered_key(key, value, filter, entry_table);
502 }
503 }
504
512 mapped_type search_memtable(const key_type& key) const {
513 const auto mem_table_value{mem_table_.get(key)};
514 cache_.put(key, mem_table_value);
515 dirty_table_[key] = false;
516 return mem_table_value;
517 }
518
528 mapped_type iterative_layer_search(
529 size_type k,
530 const key_type& key
531 ) const {
532 validate_layer_index(k);
533 for (const auto& sstable : ck_layers_[k] | std::views::reverse) {
534 const auto sstable_value{sstable.get(key)};
535 if (!sstable_value.has_value()) {
536 continue;
537 }
538 cache_.put(key, sstable_value);
539 dirty_table_[key] = false;
540 return sstable_value;
541 }
542 return std::nullopt;
543 }
544
554 mapped_type binary_layer_search(
555 size_type k,
556 const key_type& key
557 ) const {
558 const auto timestamp{key.timestamp()};
559 const auto& ck_layer{ck_layers_[k]};
560 auto sstable_it{std::lower_bound(
561 ck_layer.begin(),
562 ck_layer.end(),
563 timestamp,
564 [](const auto& sstable, const auto target_time) {
565 return target_time < sstable.timeRange().lower();
566 }
567 )};
568
569 if (
570 sstable_it != ck_layer.end() &&
571 sstable_it->timeRange().lower() <= timestamp &&
572 timestamp <= sstable_it->timeRange().upper()
573 ) {
574 const auto sstable_value{sstable_it->get(key)};
575 if (sstable_value.has_value()) {
576 cache_.put(key, sstable_value);
577 dirty_table_[key] = false;
578 return sstable_value;
579 }
580 return std::nullopt;
581 }
582 return std::nullopt;
583 }
584
585
598 void binary_layer_search_range(
599 size_type k,
600 const key_type& start,
601 const key_type& end,
602 const TimeSeriesKeyFilter& filter,
603 table_type& entry_table
604 ) const {
605 const auto& ck_layer{ck_layers_[k]};
606 auto start_it{std::lower_bound(
607 ck_layer.begin(),
608 ck_layer.end(),
609 start.timestamp(),
610 [](const auto& sstable, const auto target_time) {
611 return sstable.timeRange().upper() < target_time;
612 }
613 )};
614
615 auto end_it{std::upper_bound(
616 start_it,
617 ck_layer.end(),
618 end.timestamp(),
619 [](const auto target_time, const auto& sstable) {
620 return target_time < sstable.timeRange().lower();
621 }
622 )};
623
624 for (
625 const auto& sstable :
626 std::ranges::subrange(start_it, end_it) | std::views::reverse
627 ) {
628 update_entries_with_sstable_range(
629 sstable, start, end, filter, entry_table
630 );
631 }
632 }
633
641 void compact() {
642 compact_layer_if_needed(0);
643 }
644
652 void compact_layer_if_needed(size_type k) {
653 validate_layer_index(k);
654 if (k == LAYER_COUNT - 1) {
655 return;
656 }
657 if (ck_layers_[k].size() <= CK_LAYER_TABLE_COUNT[k]) {
658 return;
659 }
660 compact_layer(k);
661 compact_layer_if_needed(k + 1);
662 }
663
671 void compact_layer(size_type k) {
672 if (k == 0) {
673 merge_all_in_layer(k);
674 return;
675 }
676 while (ck_layers_[k].size() > CK_LAYER_TABLE_COUNT[k]) {
677 merge_oldest_in_layer(k);
678 }
679 }
680
691 void update_entries_with_sstable(
692 SSTable<TValue>&& sstable,
693 size_type window_size,
694 TimeWindowToEntriesMap& time_window_to_entries,
695 std::vector<FilePath>& files_to_remove
696 ) const noexcept {
697 for (const auto& [key, value] : sstable.entries()) {
698 const auto start{(key.timestamp() / window_size) * window_size};
699 const auto time_window{TimeWindow{start, start + window_size}};
700 time_window_to_entries[time_window][key] = value;
701 }
702 files_to_remove.push_back(sstable.path());
703 files_to_remove.push_back(sstable.metadataPath());
704 }
705
711 void merge_all_in_layer(size_type k) {
712 auto& curr_layer{ck_layers_[k]};
713 auto& next_layer{ck_layers_[k + 1]};
714 const auto window_size{CK_LAYER_WINDOW_SIZE[k + 1]};
715
716 TimeWindowToEntriesMap time_window_to_entries;
717 std::vector<FilePath> files_to_remove;
718
719 for (auto&& sstable : next_layer) {
720 update_entries_with_sstable(
721 std::move(sstable), window_size,
722 time_window_to_entries,
723 files_to_remove
724 );
725 }
726 reset_layer(k + 1);
727
728 for (auto&& sstable : curr_layer) {
729 update_entries_with_sstable(
730 std::move(sstable), window_size,
731 time_window_to_entries,
732 files_to_remove
733 );
734 }
735 reset_layer(k);
736
737 for (const auto& file : files_to_remove) {
738 if (!std::filesystem::remove(file)) {
739 throw std::runtime_error{
740 "LSMTree::merge_all_in_layer(): Failed to remove file: '"
741 + std::string{file} + "' after compaction."
742 };
743 }
744 }
745
746 for (auto& [time_window, entries] : time_window_to_entries) {
747 auto sstable{merge_entries(std::move(entries), k)};
748 next_layer.push_back(std::move(sstable));
749 }
750 }
751
759 void merge_oldest_in_layer(size_type k) {
760 auto& curr_layer{ck_layers_[k]};
761 auto& next_layer{ck_layers_[k + 1]};
762 const auto window_size{CK_LAYER_WINDOW_SIZE[k + 1]};
763
764 TimeWindowToEntriesMap time_window_to_entries;
765 std::vector<FilePath> files_to_remove;
766
767 for (auto&& sstable : next_layer) {
768 update_entries_with_sstable(
769 std::move(sstable), window_size,
770 time_window_to_entries,
771 files_to_remove
772 );
773 }
774 reset_layer(k + 1);
775
776 while (curr_layer.size() > CK_LAYER_TABLE_COUNT[k]) {
777 auto& oldest_sstable{curr_layer.front()};
778 update_entries_with_sstable(
779 std::move(oldest_sstable), window_size,
780 time_window_to_entries,
781 files_to_remove
782 );
783 curr_layer.pop_front();
784 }
785
786 for (const auto& file : files_to_remove) {
787 if (!std::filesystem::remove(file)) {
788 throw std::runtime_error{
789 "LSMTree::merge_oldest_in_layer(): Failed to remove file: '"
790 + std::string{file} + "' after compaction."
791 };
792 }
793 }
794
795 for (auto& [time_window, entries] : time_window_to_entries) {
796 auto sstable{merge_entries(std::move(entries), k)};
797 next_layer.push_back(std::move(sstable));
798 }
799 }
800
808 SSTable<TValue> merge_entries(table_type&& entries, size_type k) {
809 auto memtable{MemTable<TValue>{std::move(entries)}};
810 const auto memtable_size{memtable.size()};
811 auto sstable{SSTable<TValue>{
812 get_next_file_path(k + 1),
813 std::move(memtable),
814 memtable_size
815 }};
816 return sstable;
817 }
818
823 MemTable<TValue> mem_table_;
824
829 CkLayers ck_layers_;
830
835 WriteAheadLog<TValue> wal_;
836
841 FilePath path_;
842
847 std::array<size_type, LAYER_COUNT> sstable_id_;
848
853 mutable Cache cache_;
854
859 mutable DirtyTable dirty_table_;
860};
861} // namespace vkdb
862
863#endif // STORAGE_LSM_TREE_H
A thread-safe LRU cache.
Definition lru_cache.h:20
void put(K &&key, V &&value)
Put a key-value pair into the cache.
Definition lru_cache.h:93
bool contains(K &&key) const noexcept
Check if the cache contains a key.
Definition lru_cache.h:134
mapped_type get(K &&key)
Get a value from the cache.
Definition lru_cache.h:115
void clear() noexcept
Clear the cache.
Definition lru_cache.h:164
LSM tree on TimeSeriesKey.
Definition wal_lsm.h:8
std::vector< value_type > getRange(const key_type &start, const key_type &end, TimeSeriesKeyFilter &&filter) const
Get a filtered set of entries in a timestamp range.
Definition lsm_tree.h:179
void put(const key_type &key, const TValue &value, bool log=true)
Put a key-value pair into the LSM tree.
Definition lsm_tree.h:108
size_type sstableCount() const noexcept
Get the number of SSTables in the LSM tree.
Definition lsm_tree.h:249
mapped_type get(const key_type &key) const noexcept
Get a value from the LSM tree.
Definition lsm_tree.h:145
void remove(const key_type &key, bool log=true)
Remove a key pair from the LSM tree.
Definition lsm_tree.h:128
LSMTree(LSMTree &&) noexcept=default
Move-construct a LSMTree object.
void clear() noexcept
Clear the LSM tree.
Definition lsm_tree.h:212
LSMTree(FilePath path) noexcept
Construct a new LSMTree object.
Definition lsm_tree.h:59
static constexpr size_type CACHE_CAPACITY
Max number of entries in the cache.
Definition lsm_tree.h:51
void replayWAL()
Replay the write-ahead log.
Definition lsm_tree.h:203
std::string str() const noexcept
Convert the LSM tree to a string.
Definition lsm_tree.h:233
bool empty() const noexcept
Check if the LSM tree is empty.
Definition lsm_tree.h:280
size_type sstableCount(size_type k) const
Get the number of SSTables in a Ck layer.
Definition lsm_tree.h:265
In-memory table for storing key-value pairs.
Definition mem_table.h:17
Represents a key in vkdb.