diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a17817f..c5d7f3b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,8 +59,8 @@ jobs: cd ${{github.workspace}}/build ninja src/all ccache --show-stats - GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1 ctest -V -R rdb_test + GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1 ctest -V -R rdb_test echo Run ctest -V -L DFLY #GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 - GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1 ctest -V -L DFLY + GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1 ctest -V -L DFLY # GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index f71138d..91c0e3e 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -434,7 +434,7 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { } else { if (sink_) { iovec v[2] = {{.iov_base = const_cast(ib.data()), .iov_len = ib.size()}, - {.iov_base = const_cast(buf.data()), .iov_len = buf.size()}}; + {.iov_base = const_cast(buf.data()), .iov_len = buf.size()}}; RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v))); } else { RETURN_ON_ERR(aligned_buf_->Write(ib)); @@ -608,12 +608,6 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) { size_t num_written = 0; SliceSnapshot::DbRecord record; -#if 1 - -#else - vector vals; - vector ivec; -#endif uint8_t buf[16]; DbIndex last_db_index = kInvalidDbId; @@ -624,6 +618,7 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) { // we can not exit on io-error since we spawn fibers that push data. // TODO: we may signal them to stop processing and exit asap in case of the error. + size_t channel_bytes = 0; while (channel.Pop(record)) { if (io_error) @@ -634,49 +629,27 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) { unsigned enclen = SerializeLen(record.db_index, buf + 1); char* str = (char*)buf; -#if 1 io_error = aligned_buf_.Write(string_view{str, enclen + 1}); if (io_error) break; -#else - vals.emplace_back(string(str, enclen + 1)); -#endif last_db_index = record.db_index; } -#if 1 + channel_bytes += record.value.size(); io_error = aligned_buf_.Write(record.value); record.value.clear(); -#else - vals.emplace_back(std::move(record.value)); -#endif } while (!io_error && channel.TryPop(record)); -#if 1 - -#else - ivec.resize(vals.size()); - for (size_t i = 0; i < ivec.size(); ++i) { - ivec[i].iov_base = vals[i].data(); - ivec[i].iov_len = vals[i].size(); - } - RETURN_ON_ERR(sink_->Write(ivec.data(), ivec.size())); - - num_written += vals.size(); - vals.clear(); -#endif } // while (channel.pop) - /*if (buf_offs_) { - size_t len = (buf_offs_ + kAmask) & (~kAmask); - ivec.iov_len = len; - RETURN_ON_ERR(sink_->Write(&ivec, 1)); - }*/ + size_t pushed_bytes = 0; for (auto& ptr : impl_->shard_snapshots) { ptr->Join(); + pushed_bytes += ptr->channel_bytes(); } - VLOG(1) << "Blobs written " << num_written; + VLOG(1) << "Blobs written " << num_written << " pulled bytes: " << channel_bytes + << " pushed bytes: " << pushed_bytes; if (io_error) return io_error; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 709fcab..a3e9ab3 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -74,6 +74,7 @@ void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk, ec = rdb_serializer_->FlushMem(); CHECK(!ec && !sfile_->val.empty()); string tmp = std::move(sfile_->val); + channel_bytes_ += tmp.size(); dest_->Push(DbRecord{db_indx, std::move(tmp)}); } } @@ -145,6 +146,7 @@ bool SliceSnapshot::FlushSfile(bool force) { VLOG(2) << "FlushSfile " << sfile_->val.size() << " bytes"; string tmp = std::move(sfile_->val); // important to move before pushing! + channel_bytes_ += tmp.size(); dest_->Push(DbRecord{savecb_current_db_, std::move(tmp)}); return true; diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 47785b3..2f01597 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -38,6 +38,9 @@ class SliceSnapshot { } RdbSerializer* serializer() { return rdb_serializer_.get(); } + + size_t channel_bytes() const { return channel_bytes_;} + private: void FiberFunc(); bool FlushSfile(bool force); @@ -63,6 +66,7 @@ class SliceSnapshot { DbIndex savecb_current_db_; // used by SaveCb RecordChannel* dest_; + size_t channel_bytes_ = 0; size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0; };