feat: lock query cache;
add files to record unsolved bugs; getFinalResult is locked in Database::query();
This commit is contained in:
parent
5572c3233f
commit
68ba02cd37
|
@ -1359,12 +1359,12 @@ Database::query(const string _query, ResultSet& _result_set, FILE* _fp)
|
||||||
{
|
{
|
||||||
return -101;
|
return -101;
|
||||||
}
|
}
|
||||||
cout<<"read lock acquired"<<endl;
|
cout<<"read priviledge of update lock acquired"<<endl;
|
||||||
|
|
||||||
//copy the string index for each query thread
|
//copy the string index for each query thread
|
||||||
StringIndex tmpsi = *this->stringindex;
|
//StringIndex tmpsi = *this->stringindex;
|
||||||
tmpsi.emptyBuffer();
|
//tmpsi.emptyBuffer();
|
||||||
general_evaluation.setStringIndexPointer(&tmpsi);
|
//general_evaluation.setStringIndexPointer(&tmpsi);
|
||||||
|
|
||||||
// this->debug_lock.lock();
|
// this->debug_lock.lock();
|
||||||
bool query_ret = general_evaluation.doQuery();
|
bool query_ret = general_evaluation.doQuery();
|
||||||
|
@ -1375,6 +1375,7 @@ Database::query(const string _query, ResultSet& _result_set, FILE* _fp)
|
||||||
// this->debug_lock.unlock();
|
// this->debug_lock.unlock();
|
||||||
|
|
||||||
long tv_bfget = Util::get_cur_time();
|
long tv_bfget = Util::get_cur_time();
|
||||||
|
//NOTICE: this lock lock ensures that StringIndex is visited sequentially
|
||||||
this->getFinalResult_lock.lock();
|
this->getFinalResult_lock.lock();
|
||||||
if (trie == NULL)
|
if (trie == NULL)
|
||||||
{
|
{
|
||||||
|
@ -1386,7 +1387,6 @@ Database::query(const string _query, ResultSet& _result_set, FILE* _fp)
|
||||||
}
|
}
|
||||||
trie->LoadDictionary();
|
trie->LoadDictionary();
|
||||||
}
|
}
|
||||||
|
|
||||||
general_evaluation.getFinalResult(_result_set);
|
general_evaluation.getFinalResult(_result_set);
|
||||||
this->getFinalResult_lock.unlock();
|
this->getFinalResult_lock.unlock();
|
||||||
long tv_afget = Util::get_cur_time();
|
long tv_afget = Util::get_cur_time();
|
||||||
|
@ -1396,7 +1396,7 @@ Database::query(const string _query, ResultSet& _result_set, FILE* _fp)
|
||||||
need_output_answer = true;
|
need_output_answer = true;
|
||||||
//general_evaluation.setNeedOutputAnswer();
|
//general_evaluation.setNeedOutputAnswer();
|
||||||
|
|
||||||
tmpsi.clear();
|
//tmpsi.clear();
|
||||||
pthread_rwlock_unlock(&(this->update_lock));
|
pthread_rwlock_unlock(&(this->update_lock));
|
||||||
}
|
}
|
||||||
//Update
|
//Update
|
||||||
|
@ -1412,6 +1412,7 @@ Database::query(const string _query, ResultSet& _result_set, FILE* _fp)
|
||||||
cout<<"unable to write lock"<<endl;
|
cout<<"unable to write lock"<<endl;
|
||||||
return -101;
|
return -101;
|
||||||
}
|
}
|
||||||
|
cout<<"write priviledge of update lock acquired"<<endl;
|
||||||
|
|
||||||
success_num = 0;
|
success_num = 0;
|
||||||
TripleWithObjType *update_triple = NULL;
|
TripleWithObjType *update_triple = NULL;
|
||||||
|
@ -3068,7 +3069,7 @@ Database::remove(std::string _rdf_file, bool _is_restore)
|
||||||
//triple_num -= parse_triple_num;
|
//triple_num -= parse_triple_num;
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO:better to free this just after id_tuples are ok
|
//BETTER: free this just after id_tuples are ok
|
||||||
//(only when using group insertion/deletion)
|
//(only when using group insertion/deletion)
|
||||||
//or reduce the array size
|
//or reduce the array size
|
||||||
delete[] triple_array;
|
delete[] triple_array;
|
||||||
|
@ -3161,7 +3162,9 @@ Database::insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,
|
||||||
{
|
{
|
||||||
is_new_sub = true;
|
is_new_sub = true;
|
||||||
subid = this->allocEntityID();
|
subid = this->allocEntityID();
|
||||||
|
#ifdef DEBUG
|
||||||
cout << "this is a new subject: " << sub << " " << subid << endl;
|
cout << "this is a new subject: " << sub << " " << subid << endl;
|
||||||
|
#endif
|
||||||
this->sub_num++;
|
this->sub_num++;
|
||||||
this->kvstore->setIDByEntity(sub, subid);
|
this->kvstore->setIDByEntity(sub, subid);
|
||||||
this->kvstore->setEntityByID(subid, sub);
|
this->kvstore->setEntityByID(subid, sub);
|
||||||
|
@ -3194,7 +3197,9 @@ Database::insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,
|
||||||
{
|
{
|
||||||
is_new_obj = true;
|
is_new_obj = true;
|
||||||
objid = this->allocEntityID();
|
objid = this->allocEntityID();
|
||||||
|
#ifdef DEBUG
|
||||||
cout << "this is a new object: " << obj << " " << objid << endl;
|
cout << "this is a new object: " << obj << " " << objid << endl;
|
||||||
|
#endif
|
||||||
//this->obj_num++;
|
//this->obj_num++;
|
||||||
this->kvstore->setIDByEntity(obj, objid);
|
this->kvstore->setIDByEntity(obj, objid);
|
||||||
this->kvstore->setEntityByID(objid, obj);
|
this->kvstore->setEntityByID(objid, obj);
|
||||||
|
@ -4026,7 +4031,6 @@ Database::remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,
|
||||||
return valid_num;
|
return valid_num;
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: check and improve the backup program
|
|
||||||
bool
|
bool
|
||||||
Database::backup()
|
Database::backup()
|
||||||
{
|
{
|
||||||
|
|
|
@ -95,6 +95,8 @@ bool QueryCache::getMinimalRepresentation(const Patterns &triple_pattern, Patter
|
||||||
|
|
||||||
bool QueryCache::tryCaching(const Patterns &triple_pattern, const TempResult &temp_result, int eva_time)
|
bool QueryCache::tryCaching(const Patterns &triple_pattern, const TempResult &temp_result, int eva_time)
|
||||||
{
|
{
|
||||||
|
lock_guard<mutex> (this->query_cache_lock); //when quit this scope the lock will be released
|
||||||
|
|
||||||
Patterns minimal_repre;
|
Patterns minimal_repre;
|
||||||
map<string, string> minimal_mapping;
|
map<string, string> minimal_mapping;
|
||||||
|
|
||||||
|
@ -176,8 +178,12 @@ bool QueryCache::tryCaching(const Patterns &triple_pattern, const TempResult &te
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//NOTICE: in this function we also modify some contents, so we must use mutex instead of rwlock
|
||||||
bool QueryCache::checkCached(const Patterns &triple_pattern, const Varset &varset, TempResult &temp_result)
|
bool QueryCache::checkCached(const Patterns &triple_pattern, const Varset &varset, TempResult &temp_result)
|
||||||
{
|
{
|
||||||
|
//this->query_cache_lock.lock();
|
||||||
|
lock_guard<mutex> (this->query_cache_lock); //when quit this scope the lock will be released
|
||||||
|
|
||||||
Patterns minimal_repre;
|
Patterns minimal_repre;
|
||||||
map<string, string> minimal_mapping;
|
map<string, string> minimal_mapping;
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,8 @@ class QueryCache
|
||||||
const long long ITEM_MEMORY_LIMIT = 1000000LL;
|
const long long ITEM_MEMORY_LIMIT = 1000000LL;
|
||||||
const long long TOTAL_MEMORY_LIMIT = 100000000LL;
|
const long long TOTAL_MEMORY_LIMIT = 100000000LL;
|
||||||
|
|
||||||
|
mutex query_cache_lock;
|
||||||
|
|
||||||
int time_now;
|
int time_now;
|
||||||
long long total_memory_used;
|
long long total_memory_used;
|
||||||
|
|
||||||
|
|
|
@ -91,33 +91,33 @@ bool StringIndexFile::randomAccess(unsigned id, string *str, bool real)
|
||||||
|
|
||||||
long offset = (*this->index_table)[id].offset;
|
long offset = (*this->index_table)[id].offset;
|
||||||
unsigned length = (*this->index_table)[id].length;
|
unsigned length = (*this->index_table)[id].length;
|
||||||
if(id == 9)
|
//if(id == 9)
|
||||||
{
|
//{
|
||||||
cout<<"check: "<<offset<<" "<<length<<endl;
|
//cout<<"check: "<<offset<<" "<<length<<endl;
|
||||||
}
|
//}
|
||||||
|
|
||||||
allocBuffer(length);
|
allocBuffer(length);
|
||||||
|
|
||||||
//DEBUG!!!!
|
//DEBUG: here a bug exists if we use pread instead of fread, the details are in BUG_StringIndex_pread of docs/BUGS.md
|
||||||
fseek(this->value_file, offset, SEEK_SET);
|
fseek(this->value_file, offset, SEEK_SET);
|
||||||
fread(this->buffer, sizeof(char), length, this->value_file);
|
fread(this->buffer, sizeof(char), length, this->value_file);
|
||||||
//pread(fileno(value_file), this->buffer, sizeof(char)*length, offset);
|
//pread(fileno(value_file), this->buffer, sizeof(char)*length, offset);
|
||||||
this->buffer[length] = '\0';
|
this->buffer[length] = '\0';
|
||||||
|
|
||||||
*str = string(this->buffer);
|
*str = string(this->buffer);
|
||||||
if(id == 9)
|
//if(id == 9)
|
||||||
{
|
//{
|
||||||
cout<<"check: "<<*str<<endl;
|
//cout<<"check: "<<*str<<endl;
|
||||||
}
|
//}
|
||||||
|
|
||||||
if (real)
|
if (real)
|
||||||
{
|
{
|
||||||
*str = trie->Uncompress(*str, str->length());//Uncompresss
|
*str = trie->Uncompress(*str, str->length());//Uncompresss
|
||||||
}
|
}
|
||||||
if(id == 9)
|
//if(id == 9)
|
||||||
{
|
//{
|
||||||
cout<<"check: "<<*str<<endl;
|
//cout<<"check: "<<*str<<endl;
|
||||||
}
|
//}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -145,7 +145,7 @@ void StringIndexFile::trySequenceAccess(bool real)
|
||||||
if (this->type == Predicate)
|
if (this->type == Predicate)
|
||||||
cout << "Predicate StringIndex ";
|
cout << "Predicate StringIndex ";
|
||||||
|
|
||||||
long current_offset = 0;
|
//long current_offset = 0;
|
||||||
if ((max_end - min_begin) / 800000L < (long)this->request.size())
|
if ((max_end - min_begin) / 800000L < (long)this->request.size())
|
||||||
{
|
{
|
||||||
cout << "sequence access." << endl;
|
cout << "sequence access." << endl;
|
||||||
|
@ -160,8 +160,8 @@ void StringIndexFile::trySequenceAccess(bool real)
|
||||||
char *block = new char[MAX_BLOCK_SIZE];
|
char *block = new char[MAX_BLOCK_SIZE];
|
||||||
|
|
||||||
long current_block_begin = min_begin;
|
long current_block_begin = min_begin;
|
||||||
//fseek(this->value_file, current_block_begin, SEEK_SET);
|
fseek(this->value_file, current_block_begin, SEEK_SET);
|
||||||
current_offset = current_block_begin;
|
//current_offset = current_block_begin;
|
||||||
|
|
||||||
while (current_block_begin < max_end)
|
while (current_block_begin < max_end)
|
||||||
{
|
{
|
||||||
|
@ -170,14 +170,14 @@ void StringIndexFile::trySequenceAccess(bool real)
|
||||||
if (current_block_end <= this->request[pos].offset)
|
if (current_block_end <= this->request[pos].offset)
|
||||||
{
|
{
|
||||||
current_block_begin = this->request[pos].offset;
|
current_block_begin = this->request[pos].offset;
|
||||||
//fseek(this->value_file, current_block_begin, SEEK_SET);
|
fseek(this->value_file, current_block_begin, SEEK_SET);
|
||||||
current_offset = current_block_begin;
|
//current_offset = current_block_begin;
|
||||||
current_block_end = min(current_block_begin + MAX_BLOCK_SIZE, max_end);
|
current_block_end = min(current_block_begin + MAX_BLOCK_SIZE, max_end);
|
||||||
}
|
}
|
||||||
|
|
||||||
//fread(block, sizeof(char), current_block_end - current_block_begin, this->value_file);
|
fread(block, sizeof(char), current_block_end - current_block_begin, this->value_file);
|
||||||
pread(fileno(this->value_file), block, sizeof(char)*(current_block_end-current_block_begin), current_offset);
|
//pread(fileno(this->value_file), block, sizeof(char)*(current_block_end-current_block_begin), current_offset);
|
||||||
current_offset += sizeof(char)*(current_block_end-current_block_begin);
|
//current_offset += sizeof(char)*(current_block_end-current_block_begin);
|
||||||
|
|
||||||
while (pos < (int)this->request.size())
|
while (pos < (int)this->request.size())
|
||||||
{
|
{
|
||||||
|
@ -292,13 +292,13 @@ void StringIndexFile::change(unsigned id, KVstore &kv_store)
|
||||||
|
|
||||||
fseek(this->value_file, (*this->index_table)[id].offset, SEEK_SET);
|
fseek(this->value_file, (*this->index_table)[id].offset, SEEK_SET);
|
||||||
fwrite(str.c_str(), sizeof(char), (*this->index_table)[id].length, this->value_file);
|
fwrite(str.c_str(), sizeof(char), (*this->index_table)[id].length, this->value_file);
|
||||||
if(id == 9)
|
//if(id == 9)
|
||||||
{
|
//{
|
||||||
cout<<"check in change():9 "<<str<<endl;
|
//cout<<"check in change():9 "<<str<<endl;
|
||||||
string str2;
|
//string str2;
|
||||||
randomAccess(id, &str2);
|
//randomAccess(id, &str2);
|
||||||
cout<<str2<<endl;
|
//cout<<str2<<endl;
|
||||||
}
|
//}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StringIndexFile::disable(unsigned id)
|
void StringIndexFile::disable(unsigned id)
|
||||||
|
@ -394,10 +394,10 @@ void StringIndex::addRequest(unsigned id, std::string *str, bool is_entity_or_li
|
||||||
{
|
{
|
||||||
if (is_entity_or_literal)
|
if (is_entity_or_literal)
|
||||||
{
|
{
|
||||||
if(id == 9)
|
//if(id == 9)
|
||||||
{
|
//{
|
||||||
cout<<"to search 9 in string buffer"<<endl;
|
//cout<<"to search 9 in string buffer"<<endl;
|
||||||
}
|
//}
|
||||||
//if(searchBuffer(id, str))
|
//if(searchBuffer(id, str))
|
||||||
//{
|
//{
|
||||||
//// *str = trie->Uncompress(*str)
|
//// *str = trie->Uncompress(*str)
|
||||||
|
|
|
@ -149,7 +149,7 @@ class StringIndex
|
||||||
|
|
||||||
void flush()
|
void flush()
|
||||||
{
|
{
|
||||||
//TODO: flush updates to disk to avoid missing
|
//nothing to do here
|
||||||
}
|
}
|
||||||
|
|
||||||
void emptyBuffer()
|
void emptyBuffer()
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
**This file maintains details of the bugs not solved currently.**
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### BUG_StringIndex_pread
|
||||||
|
|
||||||
|
StringIndex::randomAcces()
|
||||||
|
|
||||||
|
StringIndex::trySequenceAccess()
|
||||||
|
|
||||||
|
when we insert a triple via ghttp, and query this triple immediately, we will find that answer is wrong.
|
||||||
|
when we run this query for several times, each time we will get a different answer.
|
||||||
|
Sometimes, we will get messy code.
|
||||||
|
With the same reason, if we use bin/gquery db to enter the gquery console, insert and query within this console, we will get similar errors.
|
||||||
|
Amazingly, if we quit the console and restart, run this query again, we will get the correct answer!
|
||||||
|
|
||||||
|
The problem appears after we replace fread in StringIndex with pread, to support conncurrent queries.
|
||||||
|
The inherent reason have not been found now.
|
||||||
|
As a result, we change it back to fread, and use a lock for the StringIndex to block concurrent reads.
|
||||||
|
This is not supposed to cause a great loss in performance, because all operations to a single disk will be executed sequentially by the disk controller.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
Loading…
Reference in New Issue