1#ifndef STORAGE_LSM_TREE_H
2#define STORAGE_LSM_TREE_H
4#include <vkdb/concepts.h>
5#include <vkdb/sstable.h>
6#include <vkdb/mem_table.h>
7#include <vkdb/write_ahead_log.h>
8#include <vkdb/lru_cache.h>
9#include <vkdb/wal_lsm.h>
22using TimeSeriesKeyFilter = std::function<bool(
const TimeSeriesKey&)>;
28static const TimeSeriesKeyFilter TRUE_TIME_SERIES_KEY_FILTER =
29 [](
const TimeSeriesKey&) {
return true; };
36template <ArithmeticNoCVRefQuals TValue>
39 using key_type = TimeSeriesKey;
40 using mapped_type = std::optional<TValue>;
41 using value_type = std::pair<const key_type, mapped_type>;
42 using size_type = uint64_t;
43 using table_type =
typename MemTable<TValue>::table_type;
45 static constexpr size_type LAYER_COUNT{8};
61 , path_{std::move(path)}
63 , ck_layers_{LAYER_COUNT}
65 std::filesystem::create_directories(path_);
108 void put(const
key_type& key, const TValue& value,
bool log = true) {
110 wal_.append({WALRecordType::PUT, {key, value}});
112 mem_table_.put(key, value);
113 dirty_table_[key] =
true;
130 wal_.append({WALRecordType::REMOVE, {key, std::nullopt}});
132 mem_table_.put(key, std::nullopt);
133 dirty_table_[key] =
true;
145 [[nodiscard]] mapped_type
get(
const key_type& key)
const noexcept {
146 if (!dirty_table_[key] && cache_.
contains(key)) {
147 return cache_.
get(key);
151 return search_memtable(key);
152 }
catch (
const std::invalid_argument& e) {
153 const auto c0_value{iterative_layer_search(0, key)};
154 if (c0_value.has_value()) {
158 for (size_type k{1}; k < LAYER_COUNT; ++k) {
159 const auto ck_value{binary_layer_search(k, key)};
160 if (ck_value.has_value()) {
182 TimeSeriesKeyFilter&& filter
184 table_type entry_table;
185 for (size_type k{LAYER_COUNT - 1}; k > 0; --k) {
186 binary_layer_search_range(k, start, end, filter, entry_table);
188 for (
const auto& sstable : ck_layers_[0]) {
189 update_entries_with_sstable_range(
190 sstable, start, end, filter, entry_table
193 for (
const auto& [key, value] : mem_table_.getRange(start, end)) {
194 update_entries_with_filtered_key(key, value, filter, entry_table);
196 return {entry_table.begin(), entry_table.end()};
213 for (
const auto& ck_layer : ck_layers_) {
214 for (
const auto& sstable : ck_layer) {
215 std::filesystem::remove(sstable.path());
216 std::filesystem::remove(sstable.metadataPath());
219 std::filesystem::remove(wal_.path());
221 ck_layers_ = CkLayers{LAYER_COUNT};
223 sstable_id_ = std::array<size_type, LAYER_COUNT>{0};
225 dirty_table_.clear();
233 [[nodiscard]] std::string
str() const noexcept {
234 std::stringstream ss;
235 ss << mem_table_.str();
236 for (
const auto& ck_layer : ck_layers_) {
237 for (
const auto& sstable : ck_layer) {
251 for (
const auto& ck_layer : ck_layers_) {
252 count += ck_layer.size();
266 if (k < 0 || k >= LAYER_COUNT) {
267 throw std::out_of_range{
268 "LSMTree::sstableCount(): Layer index out of range."
271 return ck_layers_[k].size();
280 [[nodiscard]]
bool empty() const noexcept {
281 auto empty{mem_table_.empty()};
282 for (
const auto& ck_layer : ck_layers_) {
296 using SSTables = std::deque<SSTable<TValue>>;
302 using CkLayer = SSTables;
308 using CkLayers = std::vector<CkLayer>;
326 using DirtyTable = std::unordered_map<key_type, bool>;
332 using TimeWindowToEntriesMap = std::map<TimeWindow, table_type, std::greater<>>;
347 static constexpr std::array<size_type, LAYER_COUNT> CK_LAYER_WINDOW_SIZE{
362 static constexpr std::array<size_type, LAYER_COUNT> CK_LAYER_TABLE_COUNT{
379 [[nodiscard]] FilePath get_next_file_path(size_type k)
noexcept {
380 return path_ / (
"sstable_l" + std::to_string(k) +
"_"
381 + std::to_string(sstable_id_[k]++) +
".sst");
389 void load_sstables() {
390 std::array<std::set<FilePath>, LAYER_COUNT> sstable_files;
391 for (
const auto& file : std::filesystem::directory_iterator(path_)) {
392 if (!file.is_regular_file() || file.path().extension() !=
".sst") {
395 const auto l_pos{file.path().filename().string().find(
"_l")};
396 const auto layer_idx{
397 std::stoull(file.path().filename().string().substr(l_pos + 2))
399 sstable_files[layer_idx].insert(file.path());
401 std::stoull(file.path().filename().string().substr(l_pos + 4))
403 sstable_id_[layer_idx] = std::max(
404 sstable_id_[layer_idx],
405 static_cast<size_type
>(
id + 1)
408 for (size_type k{0}; k < LAYER_COUNT; ++k) {
409 for (
const auto& sstable_file : sstable_files[k]) {
410 ck_layers_[k].emplace_back(std::move(sstable_file));
422 void validate_layer_index(size_type k)
const {
423 if (k < 0 || k >= LAYER_COUNT) {
424 throw std::out_of_range{
425 "LSMTree::validate_layer_index(): Layer index out of range."
437 void reset_layer(size_type k)
noexcept {
438 validate_layer_index(k);
439 ck_layers_[k].clear();
449 FilePath sstable_file_path{get_next_file_path(0)};
450 ck_layers_[0].emplace_back(
452 std::move(mem_table_),
468 void update_entries_with_filtered_key(
470 const mapped_type& value,
471 const TimeSeriesKeyFilter& filter,
472 table_type& entry_table
477 if (!value.has_value()) {
478 entry_table.erase(key);
481 entry_table[key] = value;
493 void update_entries_with_sstable_range(
494 const SSTable<TValue>& sstable,
495 const key_type& start,
497 const TimeSeriesKeyFilter& filter,
498 table_type& entry_table
500 for (
const auto& [key, value] : sstable.getRange(start, end)) {
501 update_entries_with_filtered_key(key, value, filter, entry_table);
512 mapped_type search_memtable(
const key_type& key)
const {
513 const auto mem_table_value{mem_table_.get(key)};
514 cache_.
put(key, mem_table_value);
515 dirty_table_[key] =
false;
516 return mem_table_value;
528 mapped_type iterative_layer_search(
532 validate_layer_index(k);
533 for (
const auto& sstable : ck_layers_[k] | std::views::reverse) {
534 const auto sstable_value{sstable.get(key)};
535 if (!sstable_value.has_value()) {
538 cache_.
put(key, sstable_value);
539 dirty_table_[key] =
false;
540 return sstable_value;
554 mapped_type binary_layer_search(
558 const auto timestamp{key.timestamp()};
559 const auto& ck_layer{ck_layers_[k]};
560 auto sstable_it{std::lower_bound(
564 [](
const auto& sstable,
const auto target_time) {
565 return target_time < sstable.timeRange().lower();
570 sstable_it != ck_layer.end() &&
571 sstable_it->timeRange().lower() <= timestamp &&
572 timestamp <= sstable_it->timeRange().upper()
574 const auto sstable_value{sstable_it->get(key)};
575 if (sstable_value.has_value()) {
576 cache_.
put(key, sstable_value);
577 dirty_table_[key] =
false;
578 return sstable_value;
598 void binary_layer_search_range(
600 const key_type& start,
602 const TimeSeriesKeyFilter& filter,
603 table_type& entry_table
605 const auto& ck_layer{ck_layers_[k]};
606 auto start_it{std::lower_bound(
610 [](
const auto& sstable,
const auto target_time) {
611 return sstable.timeRange().upper() < target_time;
615 auto end_it{std::upper_bound(
619 [](
const auto target_time,
const auto& sstable) {
620 return target_time < sstable.timeRange().lower();
625 const auto& sstable :
626 std::ranges::subrange(start_it, end_it) | std::views::reverse
628 update_entries_with_sstable_range(
629 sstable, start, end, filter, entry_table
642 compact_layer_if_needed(0);
652 void compact_layer_if_needed(size_type k) {
653 validate_layer_index(k);
654 if (k == LAYER_COUNT - 1) {
657 if (ck_layers_[k].size() <= CK_LAYER_TABLE_COUNT[k]) {
661 compact_layer_if_needed(k + 1);
671 void compact_layer(size_type k) {
673 merge_all_in_layer(k);
676 while (ck_layers_[k].size() > CK_LAYER_TABLE_COUNT[k]) {
677 merge_oldest_in_layer(k);
691 void update_entries_with_sstable(
692 SSTable<TValue>&& sstable,
693 size_type window_size,
694 TimeWindowToEntriesMap& time_window_to_entries,
695 std::vector<FilePath>& files_to_remove
697 for (
const auto& [key, value] : sstable.entries()) {
698 const auto start{(key.timestamp() / window_size) * window_size};
699 const auto time_window{TimeWindow{start, start + window_size}};
700 time_window_to_entries[time_window][key] = value;
702 files_to_remove.push_back(sstable.path());
703 files_to_remove.push_back(sstable.metadataPath());
711 void merge_all_in_layer(size_type k) {
712 auto& curr_layer{ck_layers_[k]};
713 auto& next_layer{ck_layers_[k + 1]};
714 const auto window_size{CK_LAYER_WINDOW_SIZE[k + 1]};
716 TimeWindowToEntriesMap time_window_to_entries;
717 std::vector<FilePath> files_to_remove;
719 for (
auto&& sstable : next_layer) {
720 update_entries_with_sstable(
721 std::move(sstable), window_size,
722 time_window_to_entries,
728 for (
auto&& sstable : curr_layer) {
729 update_entries_with_sstable(
730 std::move(sstable), window_size,
731 time_window_to_entries,
737 for (
const auto& file : files_to_remove) {
738 if (!std::filesystem::remove(file)) {
739 throw std::runtime_error{
740 "LSMTree::merge_all_in_layer(): Failed to remove file: '"
741 + std::string{file} +
"' after compaction."
746 for (
auto& [time_window, entries] : time_window_to_entries) {
747 auto sstable{merge_entries(std::move(entries), k)};
748 next_layer.push_back(std::move(sstable));
759 void merge_oldest_in_layer(size_type k) {
760 auto& curr_layer{ck_layers_[k]};
761 auto& next_layer{ck_layers_[k + 1]};
762 const auto window_size{CK_LAYER_WINDOW_SIZE[k + 1]};
764 TimeWindowToEntriesMap time_window_to_entries;
765 std::vector<FilePath> files_to_remove;
767 for (
auto&& sstable : next_layer) {
768 update_entries_with_sstable(
769 std::move(sstable), window_size,
770 time_window_to_entries,
776 while (curr_layer.size() > CK_LAYER_TABLE_COUNT[k]) {
777 auto& oldest_sstable{curr_layer.front()};
778 update_entries_with_sstable(
779 std::move(oldest_sstable), window_size,
780 time_window_to_entries,
783 curr_layer.pop_front();
786 for (
const auto& file : files_to_remove) {
787 if (!std::filesystem::remove(file)) {
788 throw std::runtime_error{
789 "LSMTree::merge_oldest_in_layer(): Failed to remove file: '"
790 + std::string{file} +
"' after compaction."
795 for (
auto& [time_window, entries] : time_window_to_entries) {
796 auto sstable{merge_entries(std::move(entries), k)};
797 next_layer.push_back(std::move(sstable));
808 SSTable<TValue> merge_entries(table_type&& entries, size_type k) {
809 auto memtable{MemTable<TValue>{std::move(entries)}};
810 const auto memtable_size{memtable.size()};
811 auto sstable{SSTable<TValue>{
812 get_next_file_path(k + 1),
823 MemTable<TValue> mem_table_;
835 WriteAheadLog<TValue> wal_;
847 std::array<size_type, LAYER_COUNT> sstable_id_;
853 mutable Cache cache_;
859 mutable DirtyTable dirty_table_;
void put(K &&key, V &&value)
Put a key-value pair into the cache.
bool contains(K &&key) const noexcept
Check if the cache contains a key.
mapped_type get(K &&key)
Get a value from the cache.
void clear() noexcept
Clear the cache.
LSM tree on TimeSeriesKey.
std::vector< value_type > getRange(const key_type &start, const key_type &end, TimeSeriesKeyFilter &&filter) const
Get a filtered set of entries in a timestamp range.
void put(const key_type &key, const TValue &value, bool log=true)
Put a key-value pair into the LSM tree.
size_type sstableCount() const noexcept
Get the number of SSTables in the LSM tree.
mapped_type get(const key_type &key) const noexcept
Get a value from the LSM tree.
void remove(const key_type &key, bool log=true)
Remove a key pair from the LSM tree.
LSMTree(LSMTree &&) noexcept=default
Move-construct a LSMTree object.
void clear() noexcept
Clear the LSM tree.
LSMTree(FilePath path) noexcept
Construct a new LSMTree object.
static constexpr size_type CACHE_CAPACITY
Max number of entries in the cache.
void replayWAL()
Replay the write-ahead log.
std::string str() const noexcept
Convert the LSM tree to a string.
bool empty() const noexcept
Check if the LSM tree is empty.
size_type sstableCount(size_type k) const
Get the number of SSTables in a Ck layer.
In-memory table for storing key-value pairs.
Represents a key in vkdb.