feat: use multithreading to load;

fix type bugs in string index;

by zengli
This commit is contained in:
bookug 2017-06-06 18:56:46 +08:00
parent 886ca8512d
commit fa147526f1
7 changed files with 252 additions and 57 deletions

View File

@ -572,14 +572,30 @@ Database::load()
}
//TODO: acquire this arg from memory manager
//BETTER: get return value from subthread(using ref or file as hub)
unsigned vstree_cache = LRUCache::DEFAULT_CAPACITY;
bool flag = (this->vstree)->loadTree(vstree_cache);
if (!flag)
{
cout << "load tree error. @Database::load()" << endl;
return false;
}
thread vstree_thread(&Database::load_vstree, this, vstree_cache);
bool flag;
//flag = (this->vstree)->loadTree(vstree_cache);
//if (!flag)
//{
//cout << "load tree error. @Database::load()" << endl;
//return false;
//}
//(this->kvstore)->open();
int kv_mode = KVstore::READ_WRITE_MODE;
thread entity2id_thread(&Database::load_entity2id, this, kv_mode);
thread id2entity_thread(&Database::load_id2entity, this, kv_mode);
thread literal2id_thread(&Database::load_literal2id, this, kv_mode);
thread id2literal_thread(&Database::load_id2literal, this, kv_mode);
thread predicate2id_thread(&Database::load_predicate2id, this, kv_mode);
thread id2predicate_thread(&Database::load_id2predicate, this, kv_mode);
thread sub2values_thread(&Database::load_sub2values, this, kv_mode);
thread obj2values_thread(&Database::load_obj2values, this, kv_mode);
thread pre2values_thread(&Database::load_pre2values, this, kv_mode);
//this is very fast
flag = this->loadDBInfoFile();
if (!flag)
{
@ -587,18 +603,30 @@ Database::load()
return false;
}
(this->kvstore)->open();
//NOTICE: we should also run some heavy work in the main thread
this->stringindex->load();
this->readIDinfo();
pre2values_thread.join();
this->setPreMap();
id2entity_thread.join();
id2literal_thread.join();
//generate the string buffer for entity and literal, no need for predicate
//NOTICE:the total string size should not exceed 10G, assume that most strings length < 500
//too many empty between entity and literal, so divide them
this->setStringBuffer();
id2predicate_thread.join();
entity2id_thread.join();
literal2id_thread.join();
predicate2id_thread.join();
sub2values_thread.join();
obj2values_thread.join();
//wait for vstree thread
vstree_thread.join();
//warm up always as finishing build(), to utilize the system buffer
//this->warmUp();
//DEBUG:the warmUp() calls query(), which will also output results, this is not we want
@ -606,8 +634,109 @@ Database::load()
this->if_loaded = true;
cout << "finish load" << endl;
//TODO: for only-read application(like endpoint), 3 id2values trees can be closed now
//and we should load all trees on only READ mode
//HELP: just for checking infos(like kvstore)
check();
return true;
}
void
Database::load_entity2id(int _mode)
{
this->kvstore->open_entity2id(_mode);
}
void
Database::load_id2entity(int _mode)
{
this->kvstore->open_id2entity(_mode);
}
void
Database::load_literal2id(int _mode)
{
this->kvstore->open_literal2id(_mode);
}
void
Database::load_id2literal(int _mode)
{
this->kvstore->open_id2literal(_mode);
}
void
Database::load_predicate2id(int _mode)
{
this->kvstore->open_predicate2id(_mode);
}
void
Database::load_id2predicate(int _mode)
{
this->kvstore->open_id2predicate(_mode);
}
void
Database::load_sub2values(int _mode)
{
this->kvstore->open_subID2values(_mode);
}
void
Database::load_obj2values(int _mode)
{
this->kvstore->open_objID2values(_mode);
}
void
Database::load_pre2values(int _mode)
{
this->kvstore->open_preID2values(_mode);
}
void
Database::load_vstree(unsigned _vstree_size)
{
(this->vstree)->loadTree(_vstree_size);
cout<<"vstree loaded"<<endl;
}
void
Database::check()
{
//unsigned pid = this->kvstore->getIDByPredicate("<http://www.w3.org/2000/01/rdf-schema#label>");
//cout<<"check: pre "<<pid<<endl;
//unsigned sid = this->kvstore->getIDByEntity("<http://rdf.freebase.com/ns/american_football.football_player>");
//unsigned oid = this->kvstore->getIDByLiteral("\"Игроки в американский футбол\"@ru");
//unsigned* list = NULL; unsigned len = 0;
//this->kvstore->getobjIDlistBysubIDpreID(sid, pid, list, len);
//FILE* fp = fopen("kv.txt", "w");
//for(unsigned i = 0; i < len; ++i)
//{
//fprintf(fp, "%u\n", list[i]);
//string ts;
//if(Util::is_literal_ele(list[i]))
//ts = this->kvstore->getLiteralByID(list[i]);
//else
//ts = this->kvstore->getEntityByID(list[i]);
//if(ts == "")
//{
//fprintf(fp, "Error in id2string\n");
//}
//else
//{
//fprintf(fp, "%s\n", ts.c_str());
//}
//}
//string tstr;
//this->stringindex->randomAccess(2164939819, &tstr, true);
//cout<<"check: 2164939819 "<<tstr<<endl;
//this->stringindex->randomAccess(2164939818, &tstr, true);
//cout<<"check: 2164939818 "<<tstr<<endl;
//fclose(fp);
}
//NOTICE: we ensure that if the unload() exists normally, then all updates have already been written to disk
//So when accidents happens, we only have to restore the databases that are in load status(inlucidng that unload

View File

@ -131,6 +131,19 @@ private:
//BETTER:add a predicate buffer for ?p query
//However, I think this is not necessary because ?p is rare and the p2xx tree is small enough
void check();
//used for multiple threads
void load_vstree(unsigned _vstree_size);
void load_entity2id(int _mode);
void load_id2entity(int _mode);
void load_literal2id(int _mode);
void load_id2literal(int _mode);
void load_predicate2id(int _mode);
void load_id2predicate(int _mode);
void load_sub2values(int _mode);
void load_obj2values(int _mode);
void load_pre2values(int _mode);
//triple num per group for insert/delete
//can not be too high, otherwise the heap will over
static const int GROUP_SIZE = 1000;

View File

@ -10,7 +10,7 @@
using namespace std;
void StringIndexFile::setNum(int _num)
void StringIndexFile::setNum(unsigned _num)
{
this->num = _num;
}
@ -33,7 +33,7 @@ void StringIndexFile::save(KVstore &kv_store)
fwrite(&this->num, sizeof(int), 1, this->index_file);
long offset = 0;
for (int i = 0; i < this->num; i++)
for (unsigned i = 0; i < this->num; i++)
{
string str;
if (this->type == Entity)
@ -43,7 +43,7 @@ void StringIndexFile::save(KVstore &kv_store)
if (this->type == Predicate)
str = kv_store.getPredicateByID(i);
int length = str.length();
unsigned length = str.length();
fwrite(&offset, sizeof(long), 1, this->index_file);
fwrite(&length, sizeof(int), 1, this->index_file);
offset += length;
@ -74,7 +74,7 @@ void StringIndexFile::load()
fread(&this->num, sizeof(int), 1, this->index_file);
this->index_table.resize(this->num);
for (int i = 0; i < this->num; i++)
for (unsigned i = 0; i < this->num; i++)
{
fread(&this->index_table[i].offset, sizeof(long), 1, this->index_file);
fread(&this->index_table[i].length, sizeof(int), 1, this->index_file);
@ -82,13 +82,15 @@ void StringIndexFile::load()
}
}
bool StringIndexFile::randomAccess(int id, string *str)
bool StringIndexFile::randomAccess(unsigned id, string *str)
{
if (id < 0 || id >= this->num)
//DEBUG: int or unsigned here???
//if (id < 0 || id >= this->num)
if (id >= this->num)
return false;
long offset = this->index_table[id].offset;
int length = this->index_table[id].length;
unsigned length = this->index_table[id].length;
allocBuffer(length);
@ -97,11 +99,16 @@ bool StringIndexFile::randomAccess(int id, string *str)
this->buffer[length] = '\0';
*str = string(this->buffer);
//cout<<"check: read from string index - "<<id<<" "<<*str<<endl;
if(*str == "")
{
cout<<"ERROR in StringIndex - "<<id<<endl;
}
return true;
}
void StringIndexFile::addRequest(int id, std::string *str)
void StringIndexFile::addRequest(unsigned id, std::string *str)
{
this->request.push_back(AccessRequest(id, this->index_table[id].offset, this->index_table[id].length, str));
}
@ -109,7 +116,7 @@ void StringIndexFile::addRequest(int id, std::string *str)
void StringIndexFile::trySequenceAccess()
{
long max_end = 0;
for (int i = 0; i < (int)this->request.size(); i++)
for (unsigned i = 0; i < this->request.size(); i++)
max_end = max(max_end, this->request[i].offset + long(this->request[i].length));
if (this->type == Entity)
@ -124,7 +131,7 @@ void StringIndexFile::trySequenceAccess()
sort(this->request.begin(), this->request.end());
int pos = 0;
unsigned pos = 0;
fseek(this->value_file, 0, SEEK_SET);
char *block = new char[MAX_BLOCK_SIZE];
long current_block_begin = 0;
@ -145,6 +152,10 @@ void StringIndexFile::trySequenceAccess()
memcpy(this->buffer, &block[offset - current_block_begin], length);
this->buffer[length] = '\0';
*this->request[pos].str = string(this->buffer);
if(string(this->buffer) == "")
{
cout<<"Error in StringIndex"<<endl;
}
pos++;
}
else if (current_block_begin <= offset)
@ -154,6 +165,10 @@ void StringIndexFile::trySequenceAccess()
memcpy(this->buffer, &block[offset - current_block_begin], length);
this->buffer[length] = '\0';
*this->request[pos].str = string(this->buffer);
if(string(this->buffer) == "")
{
cout<<"Error in StringIndex"<<endl;
}
break;
}
else if (offset + length <= current_block_end)
@ -167,6 +182,10 @@ void StringIndexFile::trySequenceAccess()
while (pos < (int)this->request.size() && this->request[pos - 1].offset == this->request[pos].offset)
{
*this->request[pos].str = *this->request[pos - 1].str;
if(*this->request[pos].str == "")
{
cout<<"Error in StringIndex"<<endl;
}
pos++;
}
}
@ -177,6 +196,10 @@ void StringIndexFile::trySequenceAccess()
memcpy(this->buffer, block, length);
this->buffer[length] = '\0';
*this->request[pos].str += string(this->buffer);
if(*this->request[pos].str == "")
{
cout<<"Error in StringIndex"<<endl;
}
break;
}
}
@ -189,16 +212,23 @@ void StringIndexFile::trySequenceAccess()
{
cout << "random access." << endl;
for (int i = 0; i < (int)this->request.size(); i++)
for (unsigned i = 0; i < (int)this->request.size(); i++)
this->randomAccess(this->request[i].id, this->request[i].str);
}
this->request.clear();
}
void StringIndexFile::change(int id, KVstore &kv_store)
void StringIndexFile::change(unsigned id, KVstore &kv_store)
{
if (id < 0) return;
if(this->type == Predicate)
{
if (id < 0) return;
}
else
{
if (id == INVALID) return;
}
if (this->num <= id)
{
@ -237,9 +267,16 @@ void StringIndexFile::change(int id, KVstore &kv_store)
fwrite(str.c_str(), sizeof(char), this->index_table[id].length, this->value_file);
}
void StringIndexFile::disable(int id)
void StringIndexFile::disable(unsigned id)
{
if (id < 0 || id >= this->num) return ;
if(this->type == Predicate)
{
if (id < 0 || id >= this->num) return ;
}
else
{
if (id == INVALID) return;
}
this->index_table[id] = IndexInfo();
@ -249,7 +286,7 @@ void StringIndexFile::disable(int id)
}
//----------------------------------------------------------------------------------------------------------------------------------------------------
void StringIndex::setNum(StringIndexFile::StringIndexFileType _type, int _num)
void StringIndex::setNum(StringIndexFile::StringIndexFileType _type, unsigned _num)
{
if (_type == StringIndexFile::Entity)
this->entity.setNum(_num);
@ -274,7 +311,7 @@ void StringIndex::load()
}
bool
StringIndex::searchBuffer(int _id, string* _str)
StringIndex::searchBuffer(unsigned _id, string* _str)
{
if(_id < Util::LITERAL_FIRST_ID) //entity
{
@ -297,7 +334,7 @@ StringIndex::searchBuffer(int _id, string* _str)
}
}
bool StringIndex::randomAccess(int id, string *str, bool is_entity_or_literal)
bool StringIndex::randomAccess(unsigned id, string *str, bool is_entity_or_literal)
{
if(id < 0) return false;
@ -307,11 +344,18 @@ bool StringIndex::randomAccess(int id, string *str, bool is_entity_or_literal)
{
return true;
}
else
{
//cout<<"check: not found in string buffer - "<<id<<endl;
}
if (id < Util::LITERAL_FIRST_ID)
return this->entity.randomAccess(id, str);
else
{
//cout<<"check: to search literal "<<id-Util::LITERAL_FIRST_ID<<endl;
return this->literal.randomAccess(id - Util::LITERAL_FIRST_ID, str);
}
}
else
{
@ -319,7 +363,7 @@ bool StringIndex::randomAccess(int id, string *str, bool is_entity_or_literal)
}
}
void StringIndex::addRequest(int id, std::string *str, bool is_entity_or_literal)
void StringIndex::addRequest(unsigned id, std::string *str, bool is_entity_or_literal)
{
if (is_entity_or_literal)
{
@ -349,7 +393,7 @@ void StringIndex::change(std::vector<unsigned> &ids, KVstore &kv_store, bool is_
{
if (is_entity_or_literal)
{
for (int i = 0; i < (int)ids.size(); i++)
for (unsigned i = 0; i < ids.size(); i++)
{
if (ids[i] < Util::LITERAL_FIRST_ID)
this->entity.change(ids[i], kv_store);
@ -359,7 +403,7 @@ void StringIndex::change(std::vector<unsigned> &ids, KVstore &kv_store, bool is_
}
else
{
for (int i = 0; i < (int)ids.size(); i++)
for (unsigned i = 0; i < ids.size(); i++)
this->predicate.change(ids[i], kv_store);
}
}
@ -368,7 +412,7 @@ void StringIndex::disable(std::vector<unsigned> &ids, bool is_entity_or_literal)
{
if (is_entity_or_literal)
{
for (int i = 0; i < (int)ids.size(); i++)
for (unsigned i = 0; i < ids.size(); i++)
{
if (ids[i] < Util::LITERAL_FIRST_ID)
this->entity.disable(ids[i]);
@ -378,7 +422,7 @@ void StringIndex::disable(std::vector<unsigned> &ids, bool is_entity_or_literal)
}
else
{
for (int i = 0; i < (int)ids.size(); i++)
for (unsigned i = 0; i < ids.size(); i++)
this->predicate.disable(ids[i]);
}
}

View File

@ -22,30 +22,30 @@ class StringIndexFile
private:
StringIndexFileType type;
std::string loc;
int num;
unsigned num;
long empty_offset;
FILE *index_file, *value_file;
class IndexInfo
{
public:
IndexInfo(long _offset = 0, int _length = 0):offset(_offset), length(_length){}
IndexInfo(long _offset = 0, unsigned _length = 0):offset(_offset), length(_length){}
long offset;
int length;
unsigned length;
};
std::vector<IndexInfo> index_table;
int buffer_size;
unsigned buffer_size;
char *buffer;
class AccessRequest
{
public:
int id;
unsigned id;
long offset;
int length;
unsigned length;
std::string *str;
AccessRequest(int _id, long _offset, int _length, std::string *_str):
AccessRequest(unsigned _id, long _offset, unsigned _length, std::string *_str):
id(_id), offset(_offset), length(_length), str(_str){};
inline bool operator < (const AccessRequest &x) const
{
@ -55,7 +55,7 @@ class StringIndexFile
std::vector<AccessRequest> request;
public:
StringIndexFile(StringIndexFileType _type, std::string _dir, int _num):type(_type), num(_num), empty_offset(0), index_file(NULL), value_file(NULL), buffer_size(0), buffer(NULL)
StringIndexFile(StringIndexFileType _type, std::string _dir, unsigned _num):type(_type), num(_num), empty_offset(0), index_file(NULL), value_file(NULL), buffer_size(0), buffer(NULL)
{
if (this->type == Entity)
this->loc = _dir + "/entity_";
@ -73,12 +73,12 @@ class StringIndexFile
if (this->buffer != NULL)
delete[] this->buffer;
}
void setNum(int _num);
void setNum(unsigned _num);
void save(KVstore &kv_store);
void load();
inline void allocBuffer(int length)
inline void allocBuffer(unsigned length)
{
if (this->buffer_size <= length)
{
@ -88,12 +88,12 @@ class StringIndexFile
}
}
bool randomAccess(int id, std::string *str);
void addRequest(int id, std::string *str);
bool randomAccess(unsigned id, std::string *str);
void addRequest(unsigned id, std::string *str);
void trySequenceAccess();
void change(int id, KVstore &kv_store);
void disable(int id);
void change(unsigned id, KVstore &kv_store);
void disable(unsigned id);
};
class StringIndex
@ -105,7 +105,7 @@ class StringIndex
Buffer* literal_buffer;
unsigned literal_buffer_size;
public:
StringIndex(std::string _dir, int _entity_num = 0, int _literal_num = 0, int _predicate_num = 0) :
StringIndex(std::string _dir, unsigned _entity_num = 0, unsigned _literal_num = 0, unsigned _predicate_num = 0) :
entity(StringIndexFile::Entity, _dir, _entity_num), literal(StringIndexFile::Literal, _dir, _literal_num), predicate(StringIndexFile::Predicate, _dir, _predicate_num){}
void setBuffer(Buffer* _ebuf, Buffer* _lbuf)
@ -115,15 +115,15 @@ class StringIndex
this->literal_buffer = _lbuf;
this->literal_buffer_size = _lbuf->size;
}
bool searchBuffer(int _id, std::string* _str);
bool searchBuffer(unsigned _id, std::string* _str);
void setNum(StringIndexFile::StringIndexFileType _type, int _num);
void setNum(StringIndexFile::StringIndexFileType _type, unsigned _num);
void save(KVstore &kv_store);
void load();
bool randomAccess(int id, std::string *str, bool is_entity_or_literal = true);
void addRequest(int id, std::string *str, bool is_entity_or_literal = true);
bool randomAccess(unsigned id, std::string *str, bool is_entity_or_literal = true);
void addRequest(unsigned id, std::string *str, bool is_entity_or_literal = true);
void trySequenceAccess();
void change(std::vector<unsigned> &ids, KVstore &kv_store, bool is_entity_or_literal = true);

View File

@ -56,6 +56,7 @@ in the sparql query can point to the same node in data graph)
#include <set>
#include <stack>
#include <queue>
#include <deque>
#include <vector>
#include <list>
#include <iterator>
@ -64,6 +65,13 @@ in the sparql query can point to the same node in data graph)
#include <utility>
//NOTICE:below are libraries need to link
#include <thread> //only for c++11 or greater versions
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <future>
#include <memory>
#include <stdexcept>
#include <pthread.h>
#include <math.h>
#include <readline/readline.h>
@ -210,7 +218,8 @@ public:
//In order to differentiate the sub-part and literal-part of object
//let subid begin with 0, while literalid begins with LITERAL_FIRST_ID
//used in Database and Join
static const int LITERAL_FIRST_ID = 2 * 1000*1000*1000;
static const unsigned LITERAL_FIRST_ID = 2 * 1000*1000*1000;
//static const int LITERAL_FIRST_ID = 2 * 1000*1000*1000;
//initial transfer buffer size in Tree/ and Stream/
static const unsigned TRANSFER_SIZE = 1 << 20; //1M

View File

@ -1,5 +1,5 @@
#CC=g++
CC=ccache g++
CC=ccache g++ -std=c++11
lib_dir=../lib/
socket_obj_dir=../../../.objs/

View File

@ -35,7 +35,7 @@
#compile parameters
CC = ccache g++ -std=c++11
CC = ccache g++
#CC = g++
#the optimazition level of gcc/g++
@ -43,11 +43,11 @@ CC = ccache g++ -std=c++11
#NOTICE: -O2 is recommended, while -O3 is dangerous
#when developing, not use -O because it will disturb the normal
#routine. use it for test and release.
CFLAGS = -c -Wall -O2
EXEFLAG = -O2
CFLAGS = -c -Wall -O2 -pthread -std=c++11
EXEFLAG = -O2 -pthread -std=c++11
#-coverage
#CFLAGS = -c -Wall -O2 -pthread
#EXEFLAG = -O2 -pthread
#CFLAGS = -c -Wall -O2 -pthread -g
#EXEFLAG = -O2 -pthread -g
#add -lreadline -ltermcap if using readline or objs contain readline
library = -ltermcap -lreadline -L./lib -L/usr/local/lib -lantlr -lgcov -lboost_filesystem -lboost_system -lboost_regex -lpthread -I/usr/local/include/boost