博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
leveldb学习:DBimpl
阅读量:6462 次
发布时间:2019-06-23

本文共 14394 字,大约阅读时间需要 47 分钟。

leveldb将数据库的有关操作都定义在了DB类,它负责整个系统功能组件的连接和调用。是整个系统的脊柱。

level::DB是一个接口类,真正的实如今DBimpl类。

作者在文档impl.html中描写叙述了leveldb的实现。当中包含文件组织、compaction和recovery等等。

DBimpl的成员变量包含:字符比較器internal_comparator_、配置类options_、bool型状态量、string型DB库名、cache对象、memtable对象、versionset对象等等前面所说的组件。

前面的解说组件部分时。分散地介绍过leveldb的文件系统。这里以下来统一说明下创建一个DB,会在硬盘里生成一些什么样的文件,以下翻译自impl.html:

1 dbname/[0-9]+.log:
log文件包含了最新的db更新。每个entry更新都以append的方式追加到文件结尾。

2 dbname/[0-9]+.sst:db的sstable文件
Leveldb把sstable文件通过level的方式组织起来,从log文件里生成的sstable被放在level 0。

当level 0的sstable文件个数超过设置时,leveldb就把全部的level 0文件,以及有重合的level 1文件merge起来,组织成一个新的level 1文件。

3 dbname/MANIFEST-[0-9]+:DB元信息文件
它记录的是leveldb的元信息。比方DB使用的Comparator名,以及各SSTable文件的管理信息:如Level层数、文件名称、最小key和最大key等等。
4 dbname/CURRENT:记录当前正在使用的Manifest文件
它的内容就是当前的manifest文件名称;由于在LevleDb的执行过程中,随着Compaction的进行。新的SSTable文件被产生。老的文件被废弃。并生成新的Manifest文件来记载sstable的变动,而CURRENT则用来记录我们关心的Manifest文件。

5 dbname/log:系统的执行日志,和options_.info_log有关,记录系统的执行信息或者错误日志。

主要函数:

Options SanitizeOptions(const std::string& dbname,                        const InternalKeyComparator* icmp,                        const InternalFilterPolicy* ipolicy,                        const Options& src)

option修正函数,将用户定义的option做一定的检查和修正,返回规范的option对象。

主要就是设置字符比較器。检查一些參数的设置(比方最大文件大小、写缓冲区的大小,sstable的block大小是否在规定值范围内)、建立log文件等等。

Status DBImpl::NewDB() {  VersionEdit new_db;  new_db.SetComparatorName(user_comparator()->Name());  new_db.SetLogNumber(0);  new_db.SetNextFile(2);  new_db.SetLastSequence(0);  const std::string manifest = DescriptorFileName(dbname_, 1);  WritableFile* file;  Status s = env_->NewWritableFile(manifest, &file);  if (!s.ok()) {    return s;  }  {    log::Writer log(file);    std::string record;    new_db.EncodeTo(&record);    s = log.AddRecord(record);    if (s.ok()) {      s = file->Close();    }  }  delete file;  if (s.ok()) {    // Make "CURRENT" file that points to the new manifest file.    s = SetCurrentFile(env_, dbname_, 1);  } else {    env_->DeleteFile(manifest);  }  return s;}

初始化一个新的DB对象,主要创建一个manfest文件,并调用versionedit::encodeto写入新db的信息(如comparator,lognumder,nextfilenumber,sstable信息),此函数在open()操作中被调用,完毕创建DB的一步。

void DBImpl::DeleteObsoleteFiles()

依据i节点删除db中的文件,会对文件的类型和内容做一个推断,首先。正在compact的sstable不删,versionset中各个版本号下的sstable文件不删。当前的log和manfest文件不删。调用env_->DeleteFile删除文件。

Status DBImpl::Recover(VersionEdit* edit)

DB恢复函数。基于前面介绍的文件系统

1.recover首先找到当前数据库dbname_路径下的current文件,參考函数CurrentFileName(dbname_)。文件错误或者不存在,恢复都无法继续进行),2.然后调用versionset::recover()。读取manfest文件,通过一个versionedit对象中间过渡,恢复出新的version。
3.遍历dbname_文件下的文件,对照当前版本号集合versions_中记录的sstable。假设缺失,输出缺失的文件i节点,recover失败。否则
恢复log文件(參考RecoverLogFile函数)

Status DBImpl::RecoverLogFile(uint64_t log_number,                              VersionEdit* edit,                              SequenceNumber* max_sequence)

从log文件里逐条恢复entry,并写入新建立的memtable。并在合适的条件下(memtable大小大于写缓存下限:mem->ApproximateMemoryUsage() > options_.write_buffer_size)。写入level_0的sstable中(參考函数WriteLevel0Table)

Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,                                Version* base)

将memtable dump到磁盘,也就是level-0的sstable中。

1.首先产生一个新文件。并记录在文件描写叙述结构FileMetaData中
2.利用memtable的迭代器Iterator遍历memtable中的KV数据,构造sstable(參考函数BuildTable,还记得前面介绍table和block么,要对memtable的kv做进一步的打包。才干形成kv的磁盘形式)
3.把新的文件变化信息存储进versionedit,并记录这次compact的信息,主要是耗时和写入的sstable大小。

注:PickLevelForMemTableOutput函数,新的sstable定级。不能和同级的sstable有overlap。也不能和上级的sstable overlap太多(> kMaxGrandParentOverlapBytes)
WriteLevel0Table是函数CompactMemTable的核心。

leveldb中有且仅仅有一个进程单独做compact,当主线程触发compact。调用void DBImpl::MaybeScheduleCompaction()。假设compact正在执行或者DB正在退出。直接返回。检查version中是否存在须要compact。有则触发后台调度env_->schedele(…)

void DBImpl::MaybeScheduleCompaction() {  mutex_.AssertHeld();  if (bg_compaction_scheduled_) {    // Already scheduled  } else if (shutting_down_.Acquire_Load()) {    // DB is being deleted; no more background compactions  } else if (!bg_error_.ok()) {    // Already got an error; no more changes  } else if (imm_ == NULL &&             manual_compaction_ == NULL &&             !versions_->NeedsCompaction()) {    // No work to be done  } else {    bg_compaction_scheduled_ = true;    env_->Schedule(&DBImpl::BGWork, this);  }}

schedele把compact处理程序函数指针和db对象指针传入后台任务队列,BGWork 是compact处理函数。Schedule函数例如以下:

void PosixEnv::Schedule(void (*function)(void*), void* arg) {  PthreadCall("lock", pthread_mutex_lock(&mu_));  // Start background thread if necessary  if (!started_bgthread_) {    started_bgthread_ = true;    PthreadCall(        "create thread",        pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));  }  // If the queue is currently empty, the background thread may currently be  // waiting.  if (queue_.empty()) {    PthreadCall("signal", pthread_cond_signal(&bgsignal_));  }  // Add to priority queue  queue_.push_back(BGItem());  queue_.back().function = function;  queue_.back().arg = arg;  PthreadCall("unlock", pthread_mutex_unlock(&mu_));}

将处理函数放入任务队列中,后台进程就能够不断地从queue_中取出任务函数,并执行。

实际compact处理进程是BackgroundCall和BackgroundCompaction。BackgroundCall完毕一些推断,条件符合则调用BackgroundCompaction,compact完毕后再次触发compact,反复上述过程。

void DBImpl::BackgroundCall() {  MutexLock l(&mutex_);  assert(bg_compaction_scheduled_);  if (shutting_down_.Acquire_Load()) {    // No more background work when shutting down.  } else if (!bg_error_.ok()) {    // No more background work after a background error.  } else {    BackgroundCompaction();  }  bg_compaction_scheduled_ = false;  // Previous compaction may have produced too many files in a level,  // so reschedule another compaction if needed.  MaybeScheduleCompaction();  bg_cv_.SignalAll();}

实际compact流程:

void DBImpl::BackgroundCompaction() {  mutex_.AssertHeld();  //immutable先compact  if (imm_ != NULL) {    CompactMemTable();    return;  }  //针对人为指定compact的key-range  Compaction* c;  bool is_manual = (manual_compaction_ != NULL);  InternalKey manual_end;  if (is_manual) {    ManualCompaction* m = manual_compaction_;    c = versions_->CompactRange(m->level, m->begin, m->end);    m->done = (c == NULL);    if (c != NULL) {      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;    }    Log(options_.info_log,        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",        m->level,        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),        (m->end ?

m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); } else { //确定须要compact的level-n和sstable c = versions_->PickCompaction(); } Status status; if (c == NULL) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsigned long long>(f->number), c->level() + 1, static_cast<unsigned long long>(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); if (!status.ok()) { RecordBackgroundError(status); } CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles(); } delete c; if (status.ok()) { // Done } else if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else { Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); } if (is_manual) { ManualCompaction* m = manual_compaction_; if (!status.ok()) { m->done = true; } if (!m->done) { // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. m->tmp_storage = manual_end; m->begin = &m->tmp_storage; } manual_compaction_ = NULL; } }

1.假设存在immutable memtable。将其dump成sstable,完毕返回。

2.假设是外部触发的compact,依据manual_compaction指定的level/start_key/end_key,选出compaction(VersionSet::CompactRange())
3.假设不是manual compact。则依据db当前状态,选出compaction(VersionSet::PickCompaction()),考虑到level sstable的均衡性,提高查找效率。

class compaction用于记录compact信息,包含compact的level和输入sstable文件等等,參见version_set.h。

4.对于非manual compact而且选出的sstable都处于level-n且不会造成过多的GrandparentOverrlap(Compaction::IsTrivialMove()),简单处理,将这些sstable推到level-n+1,更新db元信息就可以(VersionSet::LogAndApply())。
5.其它情况,则一律依据确定出的Compaction,做详细的compact处理(DBImpl::DoCompactionWork()),最后做异常情况的清理(DBImpl::CleanupCompaction())。

DBimpl::DoCompactionWork()。实际的compact过程就是对多个已经排序的sstable做一次merge排序。丢弃掉同样的Key以及删除的数据。

Status DBImpl::DoCompactionWork(CompactionState* compact) {  const uint64_t start_micros = env_->NowMicros();  //immutable compact时计时用  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions  Log(options_.info_log,  "Compacting %d@%d + %d@%d files",      compact->compaction->num_input_files(0),      compact->compaction->level(),      compact->compaction->num_input_files(1),      compact->compaction->level() + 1);  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);  assert(compact->builder == NULL);  assert(compact->outfile == NULL);  if (snapshots_.empty()) {    compact->smallest_snapshot = versions_->LastSequence();  } else {    compact->smallest_snapshot = snapshots_.oldest()->number_;  }  // Release mutex while we're actually doing the compaction work  mutex_.Unlock();  //将选出的compaction中的sstable构造MergingIterator  //对于level-0做归并排序。其它level的sstable做一个连接他们的iterator  Iterator* input = versions_->MakeInputIterator(compact->compaction);  //定位到每个sstable的first,后面将遍历input sstable的entry  input->SeekToFirst();  Status status;  ParsedInternalKey ikey;  std::string current_user_key;  bool has_current_user_key = false;  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {    // Prioritize immutable compaction work    //优先完毕immutable的compact    if (has_imm_.NoBarrier_Load() != NULL) {      const uint64_t imm_start = env_->NowMicros();      mutex_.Lock();      if (imm_ != NULL) {        CompactMemTable();        bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary      }      mutex_.Unlock();      imm_micros += (env_->NowMicros() - imm_start);    }    Slice key = input->key();    //假设当前于grandparent层产生overlap的size超过阈值,马上结束当前写入的table的构造。写入磁盘    if (compact->compaction->ShouldStopBefore(key) &&        compact->builder != NULL) {      status = FinishCompactionOutputFile(compact, input);      if (!status.ok()) {        break;      }    }    // Handle key/value, add to state, etc.    //key舍弃标志位    bool drop = false;    //key解析错误,放弃    if (!ParseInternalKey(key, &ikey)) {      // Do not hide error keys      current_user_key.clear();      has_current_user_key = false;      last_sequence_for_key = kMaxSequenceNumber;    } else {      //key与前面的key反复。丢弃      if (!has_current_user_key ||          user_comparator()->Compare(ikey.user_key,                                     Slice(current_user_key)) != 0) {        // First occurrence of this user key        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());        has_current_user_key = true;        last_sequence_for_key = kMaxSequenceNumber;      }      //key是删除类型,丢弃      if (last_sequence_for_key <= compact->smallest_snapshot) {        // Hidden by an newer entry for same user key        drop = true;    // (A)      } else if (ikey.type == kTypeDeletion &&                 ikey.sequence <= compact->smallest_snapshot &&                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {        // For this user key:        // (1) there is no data in higher levels        // (2) data in lower levels will have larger sequence numbers        // (3) data in layers that are being compacted here and have        //     smaller sequence numbers will be dropped in the next        //     few iterations of this loop (by rule (A) above).        // Therefore this deletion marker is obsolete and can be dropped.        drop = true;      }      last_sequence_for_key = ikey.sequence;    }#if 0    Log(options_.info_log,        "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "        "%d smallest_snapshot: %d",        ikey.user_key.ToString().c_str(),        (int)ikey.sequence, ikey.type, kTypeValue, drop,        compact->compaction->IsBaseLevelForKey(ikey.user_key),        (int)last_sequence_for_key, (int)compact->smallest_snapshot);#endif    if (!drop) {      //假设output sstable未生成。构造新的tablebuilder      // Open output file if necessary      if (compact->builder == NULL) {        status = OpenCompactionOutputFile(compact);        if (!status.ok()) {          break;        }      }      //第一次写入的key作为output的smallest key      if (compact->builder->NumEntries() == 0) {        compact->current_output()->smallest.DecodeFrom(key);      }      //新的key写入时,更新largest key,并add进table      compact->current_output()->largest.DecodeFrom(key);      compact->builder->Add(key, input->value());      // Close output file if it is big enough      //当前sstable太大了就结束table构造      if (compact->builder->FileSize() >=          compact->compaction->MaxOutputFileSize()) {        status = FinishCompactionOutputFile(compact, input);        if (!status.ok()) {          break;        }      }    }    //下一个key    input->Next();  }  if (status.ok() && shutting_down_.Acquire_Load()) {    status = Status::IOError("Deleting DB during compaction");  }  if (status.ok() && compact->builder != NULL) {    status = FinishCompactionOutputFile(compact, input);  }  if (status.ok()) {    status = input->status();  }  delete input;  input = NULL;  //将此次compact的信息增加dbimpl::status_  CompactionStats stats;  stats.micros = env_->NowMicros() - start_micros - imm_micros;  for (int which = 0; which < 2; which++) {    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {      stats.bytes_read += compact->compaction->input(which, i)->file_size;    }  }  for (size_t i = 0; i < compact->outputs.size(); i++) {    stats.bytes_written += compact->outputs[i].file_size;  }  mutex_.Lock();  stats_[compact->compaction->level() + 1].Add(stats);  if (status.ok()) {    status = InstallCompactionResults(compact);  }  if (!status.ok()) {    RecordBackgroundError(status);  }  VersionSet::LevelSummaryStorage tmp;  Log(options_.info_log,      "compacted to: %s", versions_->LevelSummary(&tmp));  return status;}

转载地址:http://tahzo.baihongyu.com/

你可能感兴趣的文章
Umap2:开源USB host安全评估工具
查看>>
使用Jmeter对API进行性能测试
查看>>
基于顺序表哈夫曼树
查看>>
thinkphp-条件判断-范围判断-notin
查看>>
CDH4/5集群正确启动和停止顺序
查看>>
CSS对IE7,IE6,FireFox和其它不同浏览器的控制(转)
查看>>
lvs调优之ipvsadm --persistent 与--set
查看>>
C语言之作用域
查看>>
Oracle专家调优秘密
查看>>
Ubuntu学习 cp
查看>>
使用PowerShell管理Office 365资源邮箱
查看>>
ps证件照快速换背景方法
查看>>
服务器CPU使用率过高的处理
查看>>
mysql 数据库 知识体系
查看>>
错误: 必须限制口令文件读取访问: /usr/local/jdk1.6/jre/lib/management/snmp.acl
查看>>
1----apache工作流程和报文详解
查看>>
Web调用安卓,苹果手机摄像头,本地图片和文件
查看>>
WindowsXP VOL,VLK,FPP,RTM的含义
查看>>
在Mac系统中遇到.DS_Store 文件怎么彻底删除
查看>>
centos下搭建svn+apache服务器
查看>>