refactor: use KVstore as filter instead of VSTree;

mmeory is saved, as well as stablity;
TODO: deal with pre var,a nd consider multiple same pres;

by hulin
This commit is contained in:
bookug 2017-07-02 21:39:35 +08:00
parent 556edb1b49
commit 09a30931d4
5 changed files with 489 additions and 249 deletions

View File

@ -74,7 +74,8 @@ Database::Database(string _name)
this->kvstore = new KVstore(kv_store_path);
string vstree_store_path = store_path + "/vs_store";
this->vstree = new VSTree(vstree_store_path);
//this->vstree = new VSTree(vstree_store_path);
this->vstree = NULL;
string stringindex_store_path = store_path + "/stringindex_store";
this->stringindex = new StringIndex(stringindex_store_path);
@ -577,16 +578,16 @@ Database::load()
unsigned vstree_cache = LRUCache::DEFAULT_CAPACITY;
bool flag;
#ifndef THREAD_ON
flag = (this->vstree)->loadTree(vstree_cache);
if (!flag)
{
cout << "load tree error. @Database::load()" << endl;
return false;
}
//flag = (this->vstree)->loadTree(vstree_cache);
//if (!flag)
//{
//cout << "load tree error. @Database::load()" << endl;
//return false;
//}
(this->kvstore)->open();
#else
thread vstree_thread(&Database::load_vstree, this, vstree_cache);
//thread vstree_thread(&Database::load_vstree, this, vstree_cache);
int kv_mode = KVstore::READ_WRITE_MODE;
thread entity2id_thread(&Database::load_entity2id, this, kv_mode);
@ -644,7 +645,7 @@ Database::load()
sub2values_thread.join();
obj2values_thread.join();
//wait for vstree thread
vstree_thread.join();
//vstree_thread.join();
#endif
//warm up always as finishing build(), to utilize the system buffer
@ -798,9 +799,9 @@ Database::unload()
this->literal_buffer = NULL;
//TODO: fflush the database file
this->vstree->saveTree();
delete this->vstree;
this->vstree = NULL;
//this->vstree->saveTree();
//delete this->vstree;
//this->vstree = NULL;
delete this->kvstore;
this->kvstore = NULL;
@ -1017,8 +1018,8 @@ Database::build(const string& _rdf_file)
string kv_store_path = store_path + "/kv_store";
Util::create_dir(kv_store_path);
string vstree_store_path = store_path + "/vs_store";
Util::create_dir(vstree_store_path);
//string vstree_store_path = store_path + "/vs_store";
//Util::create_dir(vstree_store_path);
string stringindex_store_path = store_path + "/stringindex_store";
Util::create_dir(stringindex_store_path);
@ -1055,21 +1056,21 @@ Database::build(const string& _rdf_file)
//this->kvstore->release();
//cout<<"release kvstore"<<endl;
long before_vstree = Util::get_cur_time();
//long before_vstree = Util::get_cur_time();
//(this->kvstore)->open();
string _entry_file = this->getSignatureBFile();
//string _entry_file = this->getSignatureBFile();
cout << "begin build VS-Tree on " << ret << "..." << endl;
//cout << "begin build VS-Tree on " << ret << "..." << endl;
//NOTICE: we can use larger buffer for vstree in building process, because it does not compete with others
//we only need to build vstree in this phase(no need for id tuples anymore)
//TODO: acquire this arg from memory manager
unsigned vstree_cache_size = 4 * LRUCache::DEFAULT_CAPACITY;
//unsigned vstree_cache_size = 4 * LRUCache::DEFAULT_CAPACITY;
//BETTER: we should set the parameter according to current memory usage
(this->vstree)->buildTree(_entry_file, vstree_cache_size);
//(this->vstree)->buildTree(_entry_file, vstree_cache_size);
long tv_build_end = Util::get_cur_time();
cout << "after build vstree, used " << (tv_build_end - before_vstree) << "ms." << endl;
//cout << "after build vstree, used " << (tv_build_end - before_vstree) << "ms." << endl;
cout << "after build, used " << (tv_build_end - tv_build_begin) << "ms." << endl;
cout << "finish build VS-Tree." << endl;
@ -1519,81 +1520,81 @@ Database::build_s2xx(ID_TUPLE* _p_id_tuples)
this->kvstore->build_subID2values(_p_id_tuples, this->triples_num);
//save all entity_signature into binary file
string sig_binary_file = this->getSignatureBFile();
FILE* sig_fp = fopen(sig_binary_file.c_str(), "wb");
if (sig_fp == NULL)
{
cout << "Failed to open : " << sig_binary_file << endl;
return;
}
//string sig_binary_file = this->getSignatureBFile();
//FILE* sig_fp = fopen(sig_binary_file.c_str(), "wb");
//if (sig_fp == NULL)
//{
//cout << "Failed to open : " << sig_binary_file << endl;
//return;
//}
//NOTICE:in build process, all IDs are continuous growing
EntityBitSet tmp_bitset;
tmp_bitset.reset();
for(TYPE_ENTITY_LITERAL_ID i = 0; i < this->entity_num; ++i)
{
SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), -1);
fwrite(sig, sizeof(SigEntry), 1, sig_fp);
delete sig;
}
//EntityBitSet tmp_bitset;
//tmp_bitset.reset();
//for(TYPE_ENTITY_LITERAL_ID i = 0; i < this->entity_num; ++i)
//{
//SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), -1);
//fwrite(sig, sizeof(SigEntry), 1, sig_fp);
//delete sig;
//}
TYPE_ENTITY_LITERAL_ID prev_entity_id = INVALID_ENTITY_LITERAL_ID;
//TYPE_ENTITY_LITERAL_ID prev_entity_id = INVALID_ENTITY_LITERAL_ID;
//int prev_entity_id = -1;
//NOTICE: i*3 + j maybe break the unsigned limit
//for (unsigned long i = 0; i < this->triples_num; ++i)
for (TYPE_TRIPLE_NUM i = 0; i < this->triples_num; ++i)
{
TYPE_ENTITY_LITERAL_ID subid = _p_id_tuples[i].subid;
TYPE_PREDICATE_ID preid = _p_id_tuples[i].preid;
TYPE_ENTITY_LITERAL_ID objid = _p_id_tuples[i].objid;
//TYPE_ENTITY_LITERAL_ID subid = _p_id_tuples[i*3+0];
//TYPE_PREDICATE_ID preid = _p_id_tuples[i*3+1];
//TYPE_ENTITY_LITERAL_ID objid = _p_id_tuples[i*3+2];
if(subid != prev_entity_id)
{
if(prev_entity_id != INVALID_ENTITY_LITERAL_ID)
//if(prev_entity_id != -1)
{
#ifdef DEBUG
//if(prev_entity_id == 13)
//{
//cout<<"yy: "<<Signature::BitSet2str(tmp_bitset)<<endl;
//}
#endif
//NOTICE: we must do twice, we need to locate on the same entry to deal, so we must place in order
SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), prev_entity_id);
//write the sig entry
fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
fwrite(sig, sizeof(SigEntry), 1, sig_fp);
//_all_bitset |= *_entity_bitset[i];
delete sig;
}
prev_entity_id = subid;
tmp_bitset.reset();
Signature::encodeEdge2Entity(tmp_bitset, preid, objid, Util::EDGE_OUT);
//Signature::encodePredicate2Entity(preid, _tmp_bitset, Util::EDGE_OUT);
//Signature::encodeStr2Entity(objid, _tmp_bitset);
}
else
{
Signature::encodeEdge2Entity(tmp_bitset, preid, objid, Util::EDGE_OUT);
}
}
//for (TYPE_TRIPLE_NUM i = 0; i < this->triples_num; ++i)
//{
//TYPE_ENTITY_LITERAL_ID subid = _p_id_tuples[i].subid;
//TYPE_PREDICATE_ID preid = _p_id_tuples[i].preid;
//TYPE_ENTITY_LITERAL_ID objid = _p_id_tuples[i].objid;
////TYPE_ENTITY_LITERAL_ID subid = _p_id_tuples[i*3+0];
////TYPE_PREDICATE_ID preid = _p_id_tuples[i*3+1];
////TYPE_ENTITY_LITERAL_ID objid = _p_id_tuples[i*3+2];
//if(subid != prev_entity_id)
//{
//if(prev_entity_id != INVALID_ENTITY_LITERAL_ID)
////if(prev_entity_id != -1)
//{
//#ifdef DEBUG
////if(prev_entity_id == 13)
////{
////cout<<"yy: "<<Signature::BitSet2str(tmp_bitset)<<endl;
////}
//#endif
////NOTICE: we must do twice, we need to locate on the same entry to deal, so we must place in order
//SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), prev_entity_id);
////write the sig entry
//fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
//fwrite(sig, sizeof(SigEntry), 1, sig_fp);
////_all_bitset |= *_entity_bitset[i];
//delete sig;
//}
//prev_entity_id = subid;
//tmp_bitset.reset();
//Signature::encodeEdge2Entity(tmp_bitset, preid, objid, Util::EDGE_OUT);
////Signature::encodePredicate2Entity(preid, _tmp_bitset, Util::EDGE_OUT);
////Signature::encodeStr2Entity(objid, _tmp_bitset);
//}
//else
//{
//Signature::encodeEdge2Entity(tmp_bitset, preid, objid, Util::EDGE_OUT);
//}
//}
//NOTICE: remember to write the last entity's signature
if(prev_entity_id != INVALID_ENTITY_LITERAL_ID)
//if(prev_entity_id != -1)
{
SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), prev_entity_id);
//write the sig entry
fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
fwrite(sig, sizeof(SigEntry), 1, sig_fp);
//_all_bitset |= *_entity_bitset[i];
delete sig;
}
////NOTICE: remember to write the last entity's signature
//if(prev_entity_id != INVALID_ENTITY_LITERAL_ID)
////if(prev_entity_id != -1)
//{
//SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), prev_entity_id);
////write the sig entry
//fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
//fwrite(sig, sizeof(SigEntry), 1, sig_fp);
////_all_bitset |= *_entity_bitset[i];
//delete sig;
//}
fclose(sig_fp);
//fclose(sig_fp);
}
void
@ -1604,114 +1605,114 @@ Database::build_o2xx(ID_TUPLE* _p_id_tuples)
this->kvstore->build_objID2values(_p_id_tuples, this->triples_num);
//save all entity_signature into binary file
string sig_binary_file = this->getSignatureBFile();
//NOTICE: this is different from build_s2xx, the file already exists
FILE* sig_fp = fopen(sig_binary_file.c_str(), "rb+");
if (sig_fp == NULL)
{
cout << "Failed to open : " << sig_binary_file << endl;
return;
}
//string sig_binary_file = this->getSignatureBFile();
////NOTICE: this is different from build_s2xx, the file already exists
//FILE* sig_fp = fopen(sig_binary_file.c_str(), "rb+");
//if (sig_fp == NULL)
//{
//cout << "Failed to open : " << sig_binary_file << endl;
//return;
//}
//NOTICE:in build process, all IDs are continuous growing
//TODO:use unsigned for type and -1 should be changed
TYPE_ENTITY_LITERAL_ID prev_entity_id = INVALID_ENTITY_LITERAL_ID;
//int prev_entity_id = -1;
EntityBitSet tmp_bitset;
////NOTICE:in build process, all IDs are continuous growing
////TODO:use unsigned for type and -1 should be changed
//TYPE_ENTITY_LITERAL_ID prev_entity_id = INVALID_ENTITY_LITERAL_ID;
////int prev_entity_id = -1;
//EntityBitSet tmp_bitset;
//NOTICE: i*3 + j maybe break the unsigned limit
//for (unsigned long i = 0; i < this->triples_num; ++i)
for (TYPE_TRIPLE_NUM i = 0; i < this->triples_num; ++i)
{
TYPE_ENTITY_LITERAL_ID subid = _p_id_tuples[i].subid;
TYPE_PREDICATE_ID preid = _p_id_tuples[i].preid;
TYPE_ENTITY_LITERAL_ID objid = _p_id_tuples[i].objid;
//TYPE_ENTITY_LITERAL_ID subid = _p_id_tuples[i*3+0];
//TYPE_PREDICATE_ID preid = _p_id_tuples[i*3+1];
//TYPE_ENTITY_LITERAL_ID objid = _p_id_tuples[i*3+2];
////NOTICE: i*3 + j maybe break the unsigned limit
////for (unsigned long i = 0; i < this->triples_num; ++i)
//for (TYPE_TRIPLE_NUM i = 0; i < this->triples_num; ++i)
//{
//TYPE_ENTITY_LITERAL_ID subid = _p_id_tuples[i].subid;
//TYPE_PREDICATE_ID preid = _p_id_tuples[i].preid;
//TYPE_ENTITY_LITERAL_ID objid = _p_id_tuples[i].objid;
////TYPE_ENTITY_LITERAL_ID subid = _p_id_tuples[i*3+0];
////TYPE_PREDICATE_ID preid = _p_id_tuples[i*3+1];
////TYPE_ENTITY_LITERAL_ID objid = _p_id_tuples[i*3+2];
if(Util::is_literal_ele(objid))
{
continue;
}
//if(Util::is_literal_ele(objid))
//{
//continue;
//}
if(objid != prev_entity_id)
{
//if(prev_entity_id != -1)
if(prev_entity_id != INVALID_ENTITY_LITERAL_ID)
{
//NOTICE: we must do twice, we need to locate on the same entry to deal, so we must place in order
fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
SigEntry* old_sig = new SigEntry();
fread(old_sig, sizeof(SigEntry), 1, sig_fp);
#ifdef DEBUG
//cout<<"to write a signature: "<<prev_entity_id<<endl;
//cout<<prev_entity_id<<endl;
//if(prev_entity_id == 13)
//{
//cout<<"yy: "<<Signature::BitSet2str(tmp_bitset)<<endl;
//}
#endif
tmp_bitset |= old_sig->getEntitySig().entityBitSet;
delete old_sig;
//if(objid != prev_entity_id)
//{
////if(prev_entity_id != -1)
//if(prev_entity_id != INVALID_ENTITY_LITERAL_ID)
//{
////NOTICE: we must do twice, we need to locate on the same entry to deal, so we must place in order
//fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
//SigEntry* old_sig = new SigEntry();
//fread(old_sig, sizeof(SigEntry), 1, sig_fp);
//#ifdef DEBUG
////cout<<"to write a signature: "<<prev_entity_id<<endl;
////cout<<prev_entity_id<<endl;
////if(prev_entity_id == 13)
////{
////cout<<"yy: "<<Signature::BitSet2str(tmp_bitset)<<endl;
////}
//#endif
//tmp_bitset |= old_sig->getEntitySig().entityBitSet;
//delete old_sig;
#ifdef DEBUG
//if(prev_entity_id == 13)
//{
//cout<<"yy: "<<Signature::BitSet2str(tmp_bitset)<<endl;
//}
#endif
//#ifdef DEBUG
////if(prev_entity_id == 13)
////{
////cout<<"yy: "<<Signature::BitSet2str(tmp_bitset)<<endl;
////}
//#endif
//write the sig entry
SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), prev_entity_id);
fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
fwrite(sig, sizeof(SigEntry), 1, sig_fp);
//_all_bitset |= *_entity_bitset[i];
delete sig;
}
////write the sig entry
//SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), prev_entity_id);
//fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
//fwrite(sig, sizeof(SigEntry), 1, sig_fp);
////_all_bitset |= *_entity_bitset[i];
//delete sig;
//}
#ifdef DEBUG
//cout<<"now is a new signature: "<<objid<<endl;
#endif
//#ifdef DEBUG
////cout<<"now is a new signature: "<<objid<<endl;
//#endif
prev_entity_id = objid;
tmp_bitset.reset();
//cout<<"bitset reset"<<endl;
Signature::encodeEdge2Entity(tmp_bitset, preid, subid, Util::EDGE_IN);
//cout<<"edge encoded"<<endl;
//Signature::encodePredicate2Entity(preid, _tmp_bitset, Util::EDGE_IN);
//Signature::encodeStr2Entity(subid, _tmp_bitset);
}
else
{
//cout<<"same signature: "<<objid<<" "<<preid<<" "<<subid<<endl;
Signature::encodeEdge2Entity(tmp_bitset, preid, subid, Util::EDGE_IN);
//cout<<"edge encoded"<<endl;
}
}
//cout<<"loop ended!"<<endl;
//prev_entity_id = objid;
//tmp_bitset.reset();
////cout<<"bitset reset"<<endl;
//Signature::encodeEdge2Entity(tmp_bitset, preid, subid, Util::EDGE_IN);
////cout<<"edge encoded"<<endl;
////Signature::encodePredicate2Entity(preid, _tmp_bitset, Util::EDGE_IN);
////Signature::encodeStr2Entity(subid, _tmp_bitset);
//}
//else
//{
////cout<<"same signature: "<<objid<<" "<<preid<<" "<<subid<<endl;
//Signature::encodeEdge2Entity(tmp_bitset, preid, subid, Util::EDGE_IN);
////cout<<"edge encoded"<<endl;
//}
//}
////cout<<"loop ended!"<<endl;
//NOTICE: remember to write the last entity's signature
if(prev_entity_id != INVALID_ENTITY_LITERAL_ID)
//if(prev_entity_id != -1)
{
//cout<<"to write the last signature"<<endl;
////NOTICE: remember to write the last entity's signature
//if(prev_entity_id != INVALID_ENTITY_LITERAL_ID)
////if(prev_entity_id != -1)
//{
////cout<<"to write the last signature"<<endl;
fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
SigEntry* old_sig = new SigEntry();
fread(old_sig, sizeof(SigEntry), 1, sig_fp);
tmp_bitset |= old_sig->getEntitySig().entityBitSet;
delete old_sig;
//write the sig entry
SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), prev_entity_id);
fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
fwrite(sig, sizeof(SigEntry), 1, sig_fp);
//_all_bitset |= *_entity_bitset[i];
delete sig;
}
//fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
//SigEntry* old_sig = new SigEntry();
//fread(old_sig, sizeof(SigEntry), 1, sig_fp);
//tmp_bitset |= old_sig->getEntitySig().entityBitSet;
//delete old_sig;
////write the sig entry
//SigEntry* sig = new SigEntry(EntitySig(tmp_bitset), prev_entity_id);
//fseek(sig_fp, sizeof(SigEntry) * prev_entity_id, SEEK_SET);
//fwrite(sig, sizeof(SigEntry), 1, sig_fp);
////_all_bitset |= *_entity_bitset[i];
//delete sig;
//}
fclose(sig_fp);
//fclose(sig_fp);
}
void

View File

@ -113,11 +113,11 @@ Join::score_node(int var)
}
//double wt = Join::PARAM_DEGREE * (double)degree + Join::PARAM_SIZE / (double)size + Join::PARAM_PRE / (double)num;
//we should deal with literal variable as lately as possible
if(!this->is_literal_var(var))
{
//if(!this->is_literal_var(var))
//{
//no need to consider size for literal variable, because it may arise a lot
wt += Join::PARAM_SIZE / ((double)size+1);
}
//}
//the smallest wt returned is 0
return wt;
@ -196,29 +196,275 @@ Join::join_sparql(SPARQLquery& _sparql_query)
return true;
}
//TODO: consider a node with multiple same predicates(not pre var), use getXXX(...false) to do this
//BETTER?: ?s-p-?o, use p2so instead of p2s and p2o to get candidates for ?s and ?o will be better??
bool
Join::pre_handler()
{
cout << "start constant filter here " << endl << endl;
for (int _var_i = 0; _var_i < this->var_num; _var_i++)
{
//filter before join here
int var_degree = this->basic_query->getVarDegree(_var_i);
IDList &_list = this->basic_query->getCandidateList(_var_i);
cout << "\tVar" << _var_i << " " << this->basic_query->getVarName(_var_i) << endl;
// this->basic_query->setReady(_var_i);
for (int j = 0; j < var_degree; j++)
{
int neighbor_id = this->basic_query->getEdgeNeighborID(_var_i, j);
if (neighbor_id != -1) //TODO:what does it mean???variables in join not considered here
{
continue;
}
//TODO set ready here?
this->basic_query->setReady(_var_i);
char edge_type = this->basic_query->getEdgeType(_var_i, j);
int triple_id = this->basic_query->getEdgeID(_var_i, j);
Triple triple = this->basic_query->getTriple(triple_id);
string neighbor_name;
if (edge_type == Util::EDGE_OUT)
{
neighbor_name = triple.object;
}
else
{
neighbor_name = triple.subject;
}
//TODO : consider or not??
bool only_preid_filter = (this->basic_query->isOneDegreeNotJoinVar(neighbor_name));
if(only_preid_filter)
{
continue;
}
else
{
this->dealed_triple[triple_id] = true;
}
TYPE_PREDICATE_ID pre_id = this->basic_query->getEdgePreID(_var_i, j);
//TODO : cancle the literal
TYPE_ENTITY_LITERAL_ID lit_id = (this->kvstore)->getIDByEntity(neighbor_name);
if (lit_id == INVALID_ENTITY_LITERAL_ID)
{
lit_id = (this->kvstore)->getIDByLiteral(neighbor_name);
}
unsigned id_list_len = 0;
unsigned* id_list = NULL;
if (pre_id >= 0)
{
if (edge_type == Util::EDGE_OUT)
{
(this->kvstore)->getsubIDlistByobjIDpreID(lit_id, pre_id, id_list, id_list_len, true);
}
else
{
(this->kvstore)->getobjIDlistBysubIDpreID(lit_id, pre_id, id_list, id_list_len, true);
}
}
else if (pre_id == -2)
{
if (edge_type == Util::EDGE_OUT)
{
(this->kvstore)->getsubIDlistByobjID(lit_id, id_list, id_list_len, true);
}
else
{
(this->kvstore)->getobjIDlistBysubID(lit_id, id_list, id_list_len, true);
}
}
else
{
id_list_len = 0;
}
if (id_list_len == 0)
{
_list.clear();
delete[] id_list;
return false;
}
//TODO : correct ways to intersect
//updateList(_list, id_list, id_list_len);
if (_list.size() == 0)
_list.unionList(id_list,id_list_len);
else
_list.intersectList(id_list, id_list_len);
delete[] id_list;
if (_list.size() == 0)
{
return false;
}
}
cout << "\t\t[" << _var_i << "] after constant filter, candidate size = " << _list.size() << endl << endl << endl;
}
cout << "pre var filter start here" << endl;
for(int _var = 0; _var < this->var_num; _var++)
{
if(this->basic_query->isSatelliteInJoin(_var))
continue;
cout << "\tVar" << _var << " " << this->basic_query->getVarName(_var) << endl;
IDList& cans = this->basic_query->getCandidateList(_var);
unsigned size = this->basic_query->getCandidateSize(_var);
//result if already empty for non-literal variable
//TODO: whether use this
/*
if (size == 0)
{
if(!is_literal_var(_var))
return false;
else
return true;
}
*/
int var_degree = this->basic_query->getVarDegree(_var);
//NOTICE:maybe several same predicates
set<TYPE_PREDICATE_ID> in_edge_pre_id;
set<TYPE_PREDICATE_ID> out_edge_pre_id;
for (int i = 0; i < var_degree; i++)
{
char edge_type = this->basic_query->getEdgeType(_var, i);
int triple_id = this->basic_query->getEdgeID(_var, i);
Triple triple = this->basic_query->getTriple(triple_id);
string neighbor;
if (edge_type == Util::EDGE_OUT)
{
neighbor = triple.object;
}
else
{
neighbor = triple.subject;
}
//not consider edge with constant neighbors here
if(neighbor[0] != '?')
{
//cout << "not to filter: " << neighbor_name << endl;
continue;
}
//else
//cout << "need to filter: " << neighbor_name << endl;
TYPE_PREDICATE_ID pre_id = this->basic_query->getEdgePreID(_var, i);
//WARN+BETTER:invalid(should be discarded in Query) or ?p(should not be considered here)
if (pre_id < 0)
{
continue;
}
//TODO+BETTER: is any pre really used? do we need to losen the restrictions?
//size:m<n; time:mlgn < n-m
//The former time is computed because the m should be small if we select this p, tending to use binary-search
//when doing intersectList operation(mlgn < m+n).
//The latter time is computed due to the unnecessary copy cost if not using this p
//TYPE_TRIPLE_NUM border = size / (Util::logarithm(2, size) + 1);
//not use inefficient pre to filter
if(this->dealed_triple[triple_id])
{
continue;
}
if(this->basic_query->isOneDegreeVar(neighbor))
{
this->dealed_triple[triple_id] = true;
}
if (edge_type == Util::EDGE_OUT)
{
out_edge_pre_id.insert(pre_id);
}
else
{
in_edge_pre_id.insert(pre_id);
}
}
if (in_edge_pre_id.empty() && out_edge_pre_id.empty())
{
continue;
}
//TODO : set ready here ?
this->basic_query->setReady(_var);
//NOTICE:use p2s here, use s2p in only_pre_filter_after_join because pres there are not efficient
set<TYPE_PREDICATE_ID>::iterator it;
unsigned* list = NULL;
unsigned len = 0;
for(it = in_edge_pre_id.begin(); it != in_edge_pre_id.end(); ++it)
{
this->kvstore->getobjIDlistBypreID(*it, list, len, true);
if(cans.size() == 0)
cans.unionList(list,len);
else
cans.intersectList(list, len);
delete[] list;
}
if(in_edge_pre_id.size() != 0 && cans.size() == 0)
{
// cout << "after in_edge_filter, the cans size = 0" << endl;
return false;
}
for(it = out_edge_pre_id.begin(); it != out_edge_pre_id.end(); ++it)
{
this->kvstore->getsubIDlistBypreID(*it, list, len, true);
if(cans.size() == 0)
cans.unionList(list,len);
else
cans.intersectList(list, len);
delete[] list;
}
//this is a core vertex, so if not literal var, exit when empty
if(cans.empty())
{
return false;
}
cout << "\t\t[" << _var << "] after pre var filter, candidate size = " << cans.size() << endl << endl << endl;
}
return true;
}
bool
Join::join_basic(BasicQuery* _basic_query)
{
this->init(_basic_query);
long begin = Util::get_cur_time();
bool ret1 = this->filter_before_join();
long after_constant_filter = Util::get_cur_time();
//fprintf(stderr, "after filter_before_join: used %ld ms\n", after_filter - begin);
cout << "after filter_before_join: used " << (after_constant_filter - begin) << " ms" << endl;
if (!ret1)
{
this->clear();
return false;
}
//bool ret1 = this->filter_before_join();
//long after_constant_filter = Util::get_cur_time();
////fprintf(stderr, "after filter_before_join: used %ld ms\n", after_filter - begin);
//cout << "after filter_before_join: used " << (after_constant_filter - begin) << " ms" << endl;
//if (!ret1)
//{
//this->clear();
//return false;
//}
this->add_literal_candidate();
long after_add_literal = Util::get_cur_time();
cout << "after add_literal_candidate: used " << (after_add_literal - after_constant_filter) << " ms" << endl;
//this->add_literal_candidate();
//long after_add_literal = Util::get_cur_time();
//cout << "after add_literal_candidate: used " << (after_add_literal - after_constant_filter) << " ms" << endl;
bool ret2 = this->allFilterByPres();
//bool ret2 = true;
long after_pre_filter = Util::get_cur_time();
cout << "after allFilterByPres: used " << (after_pre_filter - after_add_literal) << " ms" << endl;
//bool ret2 = this->allFilterByPres();
////bool ret2 = true;
//long after_pre_filter = Util::get_cur_time();
//cout << "after allFilterByPres: used " << (after_pre_filter - after_add_literal) << " ms" << endl;
bool ret2 = pre_handler();
long after_prehandler = Util::get_cur_time();
cout << "after prehandler: used " << (after_prehandler - begin) << " ms" << endl;
if (!ret2)
{
this->clear();
@ -227,7 +473,7 @@ Join::join_basic(BasicQuery* _basic_query)
bool ret3 = this->join();
long after_joinbasic = Util::get_cur_time();
cout << "after join_basic: used " << (after_joinbasic - after_pre_filter) << " ms" << endl;
cout << "after join_basic: used " << (after_joinbasic - after_prehandler) << " ms" << endl;
if (!ret3)
{
this->clear();
@ -737,6 +983,7 @@ Join::toStartJoin()
}
}
//TODO: delete the code that generate the literal_candidate_list;
//NOTICE:not add literal, so no constant neighbor, this must be free literal variable
int var_id = maxi;
int var_degree = this->basic_query->getVarDegree(var_id);
@ -803,12 +1050,12 @@ Join::toStartJoin()
IDList& origin_candidate_list = this->basic_query->getCandidateList(var_id);
//int origin_candidate_list_len = origin_candidate_list.size();
origin_candidate_list.unionList(literal_candidate_list, true);
//origin_candidate_list.unionList(literal_candidate_list, true);
//int after_add_literal_candidate_list_len = origin_candidate_list.size();
this->basic_query->setReady(var_id);
cout<<"the prepared var id: "<<var_id<<endl;
cout<<"add literals num: "<<literal_candidate_list.size()<<endl;
//cout<<"add literals num: "<<literal_candidate_list.size()<<endl;
cout<<"current can size: "<<origin_candidate_list.size()<<endl;
}
@ -831,7 +1078,8 @@ Join::join()
cout<<"error in join() - id < 0"<<endl;
return false;
}
if(!this->is_literal_var(id) && smallest == 0)
//if(!this->is_literal_var(id) && smallest == 0)
if( smallest == 0)
{
cout<<"join() - already empty"<<endl;
return false; //empty result
@ -848,7 +1096,8 @@ Join::join()
cout<<"error in join() - id < 0"<<endl;
return false;
}
if(!this->is_literal_var(id_max) && biggest == 0)
//if(!this->is_literal_var(id_max) && biggest == 0)
if(biggest == 0)
{
cout<<"join() - already empty"<<endl;
return false; //empty result
@ -953,31 +1202,13 @@ Join::add_new_to_results(TableIterator it, unsigned id)
this->current_table.push_back(tmp);
}
//after remove VSTREE, modify here
void
Join::update_answer_list(IDList*& valid_ans_list, IDList& _can_list, unsigned* id_list, unsigned id_list_len, bool _is_literal)
{
if (valid_ans_list == NULL)
{
//WARN:this is too costly due to coping elements!
//valid_ans_list.unionList(_can_list);
if (_is_literal)
{
unsigned entity_len = 0;
while (true)
{
if (entity_len == id_list_len || Util::is_literal_ele(id_list[entity_len]))
break;
entity_len++;
}
//valid_ans_list.intersectList(id_list, entity_len);
valid_ans_list = IDList::intersect(_can_list, id_list, entity_len);
valid_ans_list->unionList(id_list + entity_len, id_list_len - entity_len, true);
//this->basic_query->setAddedLiteralCandidate(_id);
}
else
{
valid_ans_list = IDList::intersect(_can_list, id_list, id_list_len);
}
valid_ans_list = IDList::intersect(_can_list, id_list, id_list_len);
}
else
{
@ -1002,7 +1233,8 @@ Join::update_answer_list(IDList*& valid_ans_list, IDList& _can_list, unsigned* i
bool
Join::join_two(vector< vector<int> >& _edges, IDList& _can_list, unsigned _can_list_size, int _id, bool _is_literal)
{
if(_can_list_size == 0 && !_is_literal)
//if(_can_list_size == 0 && !_is_literal)
if(_can_list_size == 0)
{
return false; //empty result
}
@ -1340,6 +1572,10 @@ Join::multi_join()
//otherwise will be big compared with id_list
//the can_list of var representing literals is not valid,
//must use kvstore->get...() to join
//NOTICE: not cancle the followings, to be used for later
//TODO: if queries contain predicate variables, it may be hard to prepare candidates for a node
//(so it is not ready, can also be represented by is_literal_var())
bool is_literal = this->is_literal_var(id2);
if(is_literal)
{

View File

@ -103,6 +103,8 @@ private:
bool filter_before_join();
bool constant_edge_filter(int _var_i);
void preid_filter(int _var_i);
//new
bool pre_handler();
bool only_pre_filter_after_join();
void add_literal_candidate();
bool pre_var_handler();

View File

@ -140,10 +140,11 @@ Strategy::handle(SPARQLquery& _query, ResultFilter* _result_filter)
}
#else
cout << "this BasicQuery use original query strategy" << endl;
long tv_handle = Util::get_cur_time();
(this->vstree)->retrieve(_query);
//VSTREE:
//long tv_handle = Util::get_cur_time();
//(this->vstree)->retrieve(_query);
//cout << "after Retrieve, used " << (tv_retrieve - tv_handle) << "ms." << endl;
long tv_retrieve = Util::get_cur_time();
cout << "after Retrieve, used " << (tv_retrieve - tv_handle) << "ms." << endl;
this->join = new Join(kvstore, pre2num, this->limitID_predicate, this->limitID_literal);
this->join->join_sparql(_query);
@ -176,26 +177,26 @@ Strategy::handler0(BasicQuery* _bq, vector<unsigned*>& _result_list, ResultFilte
long tv_handle = Util::get_cur_time();
int varNum = _bq->getVarNum(); //the num of vars needing to be joined
//TODO:parallel by pthread
for (int i = 0; i < varNum; ++i)
{
if (_bq->if_need_retrieve(i) == false)
continue;
bool flag = _bq->isLiteralVariable(i);
const EntityBitSet& entityBitSet = _bq->getVarBitSet(i);
IDList* idListPtr = &(_bq->getCandidateList(i));
this->vstree->retrieveEntity(entityBitSet, idListPtr);
if (!flag)
{
//TODO:parallel by pthread, requiring that index is parallelable
//for (int i = 0; i < varNum; ++i)
//{
//if (_bq->if_need_retrieve(i) == false)
//continue;
//bool flag = _bq->isLiteralVariable(i);
//const EntityBitSet& entityBitSet = _bq->getVarBitSet(i);
//IDList* idListPtr = &(_bq->getCandidateList(i));
//this->vstree->retrieveEntity(entityBitSet, idListPtr);
//if (!flag)
//{
//cout<<"set ready: "<<i<<endl;
_bq->setReady(i);
}
//_bq->setReady(i);
//}
//the basic query should end if one non-literal var has no candidates
if (idListPtr->size() == 0 && !flag)
{
break;
}
}
//if (idListPtr->size() == 0 && !flag)
//{
//break;
//}
//}
//BETTER:end directly if one is empty!

View File

@ -1542,7 +1542,7 @@ VSTree::saveTreeInfo()
if (filePtr == NULL)
{
cerr << "error, can not create tree info file. @VSTree::saveTreeInfo" << endl;
//cerr << "error, can not create tree info file. @VSTree::saveTreeInfo" << endl;
return false;
}