Some clean-ups in rdb_save code. Add verbosity printings for CI build (#47)
This commit is contained in:
parent
e5107c2047
commit
6dd1552506
|
@ -59,8 +59,8 @@ jobs:
|
|||
cd ${{github.workspace}}/build
|
||||
ninja src/all
|
||||
ccache --show-stats
|
||||
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1 ctest -V -R rdb_test
|
||||
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1 ctest -V -R rdb_test
|
||||
echo Run ctest -V -L DFLY
|
||||
#GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1
|
||||
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1 ctest -V -L DFLY
|
||||
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1 ctest -V -L DFLY
|
||||
# GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test
|
||||
|
|
|
@ -434,7 +434,7 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
|
|||
} else {
|
||||
if (sink_) {
|
||||
iovec v[2] = {{.iov_base = const_cast<uint8_t*>(ib.data()), .iov_len = ib.size()},
|
||||
{.iov_base = const_cast<uint8_t*>(buf.data()), .iov_len = buf.size()}};
|
||||
{.iov_base = const_cast<uint8_t*>(buf.data()), .iov_len = buf.size()}};
|
||||
RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v)));
|
||||
} else {
|
||||
RETURN_ON_ERR(aligned_buf_->Write(ib));
|
||||
|
@ -608,12 +608,6 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
|
|||
|
||||
size_t num_written = 0;
|
||||
SliceSnapshot::DbRecord record;
|
||||
#if 1
|
||||
|
||||
#else
|
||||
vector<string> vals;
|
||||
vector<iovec> ivec;
|
||||
#endif
|
||||
|
||||
uint8_t buf[16];
|
||||
DbIndex last_db_index = kInvalidDbId;
|
||||
|
@ -624,6 +618,7 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
|
|||
|
||||
// we can not exit on io-error since we spawn fibers that push data.
|
||||
// TODO: we may signal them to stop processing and exit asap in case of the error.
|
||||
size_t channel_bytes = 0;
|
||||
|
||||
while (channel.Pop(record)) {
|
||||
if (io_error)
|
||||
|
@ -634,49 +629,27 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
|
|||
unsigned enclen = SerializeLen(record.db_index, buf + 1);
|
||||
char* str = (char*)buf;
|
||||
|
||||
#if 1
|
||||
io_error = aligned_buf_.Write(string_view{str, enclen + 1});
|
||||
if (io_error)
|
||||
break;
|
||||
#else
|
||||
vals.emplace_back(string(str, enclen + 1));
|
||||
#endif
|
||||
last_db_index = record.db_index;
|
||||
}
|
||||
|
||||
#if 1
|
||||
channel_bytes += record.value.size();
|
||||
io_error = aligned_buf_.Write(record.value);
|
||||
record.value.clear();
|
||||
#else
|
||||
vals.emplace_back(std::move(record.value));
|
||||
#endif
|
||||
} while (!io_error && channel.TryPop(record));
|
||||
|
||||
#if 1
|
||||
|
||||
#else
|
||||
ivec.resize(vals.size());
|
||||
for (size_t i = 0; i < ivec.size(); ++i) {
|
||||
ivec[i].iov_base = vals[i].data();
|
||||
ivec[i].iov_len = vals[i].size();
|
||||
}
|
||||
RETURN_ON_ERR(sink_->Write(ivec.data(), ivec.size()));
|
||||
|
||||
num_written += vals.size();
|
||||
vals.clear();
|
||||
#endif
|
||||
} // while (channel.pop)
|
||||
|
||||
/*if (buf_offs_) {
|
||||
size_t len = (buf_offs_ + kAmask) & (~kAmask);
|
||||
ivec.iov_len = len;
|
||||
RETURN_ON_ERR(sink_->Write(&ivec, 1));
|
||||
}*/
|
||||
size_t pushed_bytes = 0;
|
||||
for (auto& ptr : impl_->shard_snapshots) {
|
||||
ptr->Join();
|
||||
pushed_bytes += ptr->channel_bytes();
|
||||
}
|
||||
|
||||
VLOG(1) << "Blobs written " << num_written;
|
||||
VLOG(1) << "Blobs written " << num_written << " pulled bytes: " << channel_bytes
|
||||
<< " pushed bytes: " << pushed_bytes;
|
||||
if (io_error)
|
||||
return io_error;
|
||||
|
||||
|
|
|
@ -74,6 +74,7 @@ void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk,
|
|||
ec = rdb_serializer_->FlushMem();
|
||||
CHECK(!ec && !sfile_->val.empty());
|
||||
string tmp = std::move(sfile_->val);
|
||||
channel_bytes_ += tmp.size();
|
||||
dest_->Push(DbRecord{db_indx, std::move(tmp)});
|
||||
}
|
||||
}
|
||||
|
@ -145,6 +146,7 @@ bool SliceSnapshot::FlushSfile(bool force) {
|
|||
VLOG(2) << "FlushSfile " << sfile_->val.size() << " bytes";
|
||||
|
||||
string tmp = std::move(sfile_->val); // important to move before pushing!
|
||||
channel_bytes_ += tmp.size();
|
||||
dest_->Push(DbRecord{savecb_current_db_, std::move(tmp)});
|
||||
|
||||
return true;
|
||||
|
|
|
@ -38,6 +38,9 @@ class SliceSnapshot {
|
|||
}
|
||||
|
||||
RdbSerializer* serializer() { return rdb_serializer_.get(); }
|
||||
|
||||
size_t channel_bytes() const { return channel_bytes_;}
|
||||
|
||||
private:
|
||||
void FiberFunc();
|
||||
bool FlushSfile(bool force);
|
||||
|
@ -63,6 +66,7 @@ class SliceSnapshot {
|
|||
DbIndex savecb_current_db_; // used by SaveCb
|
||||
RecordChannel* dest_;
|
||||
|
||||
size_t channel_bytes_ = 0;
|
||||
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue