feat: add recovery function
if load a db and quit on accidents, you can start the gconsole in native mode on the db server, and use restore command to recover to teh latest db (backup function will be auto started when you connect to gserver and load a db) in addition, gserver stablity is improved to deal with query fails by wanglibo, no modifications to others
This commit is contained in:
parent
2c5ef57fe4
commit
2da666757d
|
@ -91,6 +91,9 @@ tags
|
|||
*.out
|
||||
*.bak~
|
||||
|
||||
# queries
|
||||
*.sql
|
||||
|
||||
# modules
|
||||
node_modules
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@ Database::Database()
|
|||
this->signature_binary_file = "signature.binary";
|
||||
this->six_tuples_file = "six_tuples";
|
||||
this->db_info_file = "db_info_file.dat";
|
||||
this->update_log = "update.log";
|
||||
this->update_log_since_backup = "update_since_backup.log";
|
||||
|
||||
string kv_store_path = store_path + "/kv_store";
|
||||
this->kvstore = new KVstore(kv_store_path);
|
||||
|
@ -53,11 +55,18 @@ Database::Database()
|
|||
Database::Database(string _name)
|
||||
{
|
||||
this->name = _name;
|
||||
size_t found = this->name.find_last_not_of('/');
|
||||
if (found != string::npos)
|
||||
{
|
||||
this->name.erase(found + 1);
|
||||
}
|
||||
this->store_path = Util::global_config["db_home"] + "/" + this->name + Util::global_config["db_suffix"];
|
||||
|
||||
this->signature_binary_file = "signature.binary";
|
||||
this->six_tuples_file = "six_tuples";
|
||||
this->db_info_file = "db_info_file.dat";
|
||||
this->update_log = "update.log";
|
||||
this->update_log_since_backup = "update_since_backup.log";
|
||||
|
||||
string kv_store_path = store_path + "/kv_store";
|
||||
this->kvstore = new KVstore(kv_store_path);
|
||||
|
@ -579,19 +588,30 @@ Database::load()
|
|||
return true;
|
||||
}
|
||||
|
||||
//NOTICE: we ensure that if the unload() exists normally, then all updates have already been written to disk
|
||||
//So when accidents happens, we only have to restore the databases that are in load status(inlucidng that unload
|
||||
//not finished) later.
|
||||
bool
|
||||
Database::unload()
|
||||
{
|
||||
//TODO: do we need to update the pre2num if update queries exist??
|
||||
//or we just neglect this, that is ok because pre2num is just used to count
|
||||
delete[] this->pre2num;
|
||||
this->pre2num = NULL;
|
||||
delete this->entity_buffer;
|
||||
delete this->literal_buffer;
|
||||
|
||||
delete this->entity_buffer;
|
||||
this->entity_buffer = NULL;
|
||||
delete this->literal_buffer;
|
||||
this->literal_buffer = NULL;
|
||||
|
||||
//TODO: fflush the database file
|
||||
this->vstree->saveTree();
|
||||
delete this->vstree;
|
||||
this->vstree = NULL;
|
||||
|
||||
delete this->kvstore;
|
||||
this->kvstore = NULL;
|
||||
|
||||
delete this->stringindex;
|
||||
this->stringindex = NULL;
|
||||
|
||||
|
@ -600,10 +620,28 @@ Database::unload()
|
|||
this->initIDinfo();
|
||||
|
||||
this->if_loaded = false;
|
||||
this->clear_update_log();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Database::clear()
|
||||
{
|
||||
delete[] this->pre2num;
|
||||
this->pre2num = NULL;
|
||||
delete this->entity_buffer;
|
||||
this->entity_buffer = NULL;
|
||||
delete this->literal_buffer;
|
||||
this->literal_buffer = NULL;
|
||||
|
||||
delete this->vstree;
|
||||
this->vstree = NULL;
|
||||
delete this->kvstore;
|
||||
this->kvstore = NULL;
|
||||
delete this->stringindex;
|
||||
this->stringindex = NULL;
|
||||
}
|
||||
|
||||
string
|
||||
Database::getName()
|
||||
{
|
||||
|
@ -754,6 +792,11 @@ Database::build(const string& _rdf_file)
|
|||
string stringindex_store_path = store_path + "/stringindex_store";
|
||||
Util::create_dir(stringindex_store_path);
|
||||
|
||||
string update_log_path = this->store_path + '/' + this->update_log;
|
||||
Util::create_file(update_log_path);
|
||||
string update_log_since_backup = this->store_path + '/' + this->update_log_since_backup;
|
||||
Util::create_file(update_log_since_backup);
|
||||
|
||||
cout << "begin encode RDF from : " << ret << " ..." << endl;
|
||||
|
||||
//BETTER+TODO:now require that dataset size < memory
|
||||
|
@ -800,6 +843,7 @@ Database::build(const string& _rdf_file)
|
|||
//sync();
|
||||
//cout << "sync vstree" << endl;
|
||||
|
||||
//TODO: remove signature.binary here
|
||||
//string cmd = "rm -rf " + _entry_file;
|
||||
//system(cmd.c_str());
|
||||
//cout << "signature file removed" << endl;
|
||||
|
@ -1030,6 +1074,31 @@ Database::exist_triple(int _sub_id, int _pre_id, int _obj_id)
|
|||
return is_exist;
|
||||
}
|
||||
|
||||
bool Database::exist_triple(const TripleWithObjType& _triple) {
|
||||
int sub_id = this->kvstore->getIDByEntity(_triple.getSubject());
|
||||
if (sub_id == -1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int pre_id = this->kvstore->getIDByPredicate(_triple.getPredicate());
|
||||
if (pre_id == -1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int obj_id = -1;
|
||||
if (_triple.isObjEntity()) {
|
||||
obj_id = this->kvstore->getIDByEntity(_triple.getObject());
|
||||
}
|
||||
else if (_triple.isObjLiteral()) {
|
||||
obj_id = this->kvstore->getIDByLiteral(_triple.getObject());
|
||||
}
|
||||
if (obj_id == -1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return exist_triple(sub_id, pre_id, obj_id);
|
||||
}
|
||||
|
||||
//NOTICE: all constants are transfered to ids in memory
|
||||
//this maybe not ok when size is too large!
|
||||
bool
|
||||
|
@ -1933,10 +2002,10 @@ Database::removeTriple(const TripleWithObjType& _triple, vector<int>* _vertices,
|
|||
}
|
||||
|
||||
bool
|
||||
Database::insert(std::string _rdf_file)
|
||||
Database::insert(std::string _rdf_file, bool _is_restore)
|
||||
{
|
||||
//cout<<"to load in insert"<<endl;
|
||||
bool flag = this->load();
|
||||
bool flag = _is_restore || this->load();
|
||||
//bool flag = this->load();
|
||||
if (!flag)
|
||||
{
|
||||
return false;
|
||||
|
@ -1984,7 +2053,7 @@ Database::insert(std::string _rdf_file)
|
|||
}
|
||||
|
||||
//Process the Triple one by one
|
||||
success_num += this->insert(triple_array, parse_triple_num);
|
||||
success_num += this->insert(triple_array, parse_triple_num, _is_restore);
|
||||
//some maybe invalid or duplicate
|
||||
//triple_num += parse_triple_num;
|
||||
}
|
||||
|
@ -2030,9 +2099,10 @@ Database::insert(std::string _rdf_file)
|
|||
}
|
||||
|
||||
bool
|
||||
Database::remove(std::string _rdf_file)
|
||||
Database::remove(std::string _rdf_file, bool _is_restore)
|
||||
{
|
||||
bool flag = this->load();
|
||||
bool flag = _is_restore || this->load();
|
||||
//bool flag = this->load();
|
||||
if (!flag)
|
||||
{
|
||||
return false;
|
||||
|
@ -2074,7 +2144,7 @@ Database::remove(std::string _rdf_file)
|
|||
break;
|
||||
}
|
||||
|
||||
success_num += this->remove(triple_array, parse_triple_num);
|
||||
success_num += this->remove(triple_array, parse_triple_num, _is_restore);
|
||||
//some maybe invalid or duplicate
|
||||
//triple_num -= parse_triple_num;
|
||||
}
|
||||
|
@ -2109,11 +2179,37 @@ Database::remove(std::string _rdf_file)
|
|||
}
|
||||
|
||||
int
|
||||
Database::insert(const TripleWithObjType* _triples, int _triple_num)
|
||||
Database::insert(const TripleWithObjType* _triples, int _triple_num, bool _is_restore)
|
||||
{
|
||||
vector<int> vertices, predicates;
|
||||
int valid_num = 0;
|
||||
|
||||
if (!_is_restore) {
|
||||
string path = this->getStorePath() + '/' + this->update_log;
|
||||
string path_all = this->getStorePath() + '/' + this->update_log_since_backup;
|
||||
ofstream out;
|
||||
ofstream out_all;
|
||||
out.open(path.c_str(), ios::out | ios::app);
|
||||
out_all.open(path_all.c_str(), ios::out | ios::app);
|
||||
if (!out || !out_all) {
|
||||
cerr << "Failed to open update log. Insertion aborted." << endl;
|
||||
return 0;
|
||||
}
|
||||
for (int i = 0; i < _triple_num; i++) {
|
||||
if (exist_triple(_triples[i])) {
|
||||
continue;
|
||||
}
|
||||
stringstream ss;
|
||||
ss << "I\t" << Util::node2string(_triples[i].getSubject().c_str()) << '\t';
|
||||
ss << Util::node2string(_triples[i].getPredicate().c_str()) << '\t';
|
||||
ss << Util::node2string(_triples[i].getObject().c_str()) << '.' << endl;
|
||||
out << ss.str();
|
||||
out_all << ss.str();
|
||||
}
|
||||
out.close();
|
||||
out_all.close();
|
||||
}
|
||||
|
||||
#ifdef USE_GROUP_INSERT
|
||||
//NOTICE:this is called by insert(file) or query()(but can not be too large),
|
||||
//assume that db is loaded already
|
||||
|
@ -2562,11 +2658,37 @@ Database::insert(const TripleWithObjType* _triples, int _triple_num)
|
|||
}
|
||||
|
||||
int
|
||||
Database::remove(const TripleWithObjType* _triples, int _triple_num)
|
||||
Database::remove(const TripleWithObjType* _triples, int _triple_num, bool _is_restore)
|
||||
{
|
||||
vector<int> vertices, predicates;
|
||||
int valid_num = 0;
|
||||
|
||||
if (!_is_restore) {
|
||||
string path = this->getStorePath() + '/' + this->update_log;
|
||||
string path_all = this->getStorePath() + '/' + this->update_log_since_backup;
|
||||
ofstream out;
|
||||
ofstream out_all;
|
||||
out.open(path.c_str(), ios::out | ios::app);
|
||||
out_all.open(path_all.c_str(), ios::out | ios::app);
|
||||
if (!out || !out_all) {
|
||||
cerr << "Failed to open update log. Removal aborted." << endl;
|
||||
return 0;
|
||||
}
|
||||
for (int i = 0; i < _triple_num; i++) {
|
||||
if (!exist_triple(_triples[i])) {
|
||||
continue;
|
||||
}
|
||||
stringstream ss;
|
||||
ss << "R\t" << Util::node2string(_triples[i].getSubject().c_str()) << '\t';
|
||||
ss << Util::node2string(_triples[i].getPredicate().c_str()) << '\t';
|
||||
ss << Util::node2string(_triples[i].getObject().c_str()) << '.' << endl;
|
||||
out << ss.str();
|
||||
out_all << ss.str();
|
||||
}
|
||||
out.close();
|
||||
out_all.close();
|
||||
}
|
||||
|
||||
#ifdef USE_GROUP_DELETE
|
||||
//NOTICE:this is called by remove(file) or query()(but can not be too large),
|
||||
//assume that db is loaded already
|
||||
|
@ -2911,6 +3033,214 @@ Database::remove(const TripleWithObjType* _triples, int _triple_num)
|
|||
return valid_num;
|
||||
}
|
||||
|
||||
bool
|
||||
Database::backup()
|
||||
{
|
||||
if (!Util::dir_exist(Util::backup_path))
|
||||
{
|
||||
Util::create_dir(Util::backup_path);
|
||||
}
|
||||
string backup_path = Util::backup_path + this->store_path;
|
||||
|
||||
cout << "Beginning backup." << endl;
|
||||
|
||||
string sys_cmd;
|
||||
if (Util::dir_exist(backup_path))
|
||||
{
|
||||
sys_cmd = "rm -rf " + backup_path;
|
||||
system(sys_cmd.c_str());
|
||||
}
|
||||
sys_cmd = "cp -r " + this->store_path + ' ' + backup_path;
|
||||
system(sys_cmd.c_str());
|
||||
sys_cmd = "rm " + backup_path + '/' + this->update_log;
|
||||
system(sys_cmd.c_str());
|
||||
|
||||
this->vstree->saveTree();
|
||||
this->kvstore->flush();
|
||||
|
||||
this->clear_update_log();
|
||||
string update_log_path = this->store_path + '/' + this->update_log_since_backup;
|
||||
sys_cmd = "rm " + update_log_path;
|
||||
system(sys_cmd.c_str());
|
||||
Util::create_file(update_log_path);
|
||||
|
||||
cout << "Backup completed!" << endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
Database::restore()
|
||||
{
|
||||
cout << "Begining restore." << endl;
|
||||
string sys_cmd;
|
||||
|
||||
multiset<string> insertions;
|
||||
multiset<string> removals;
|
||||
|
||||
int num_update = 0;
|
||||
if (!this->load())
|
||||
{
|
||||
this->clear();
|
||||
|
||||
string backup_path = Util::backup_path + this->store_path;
|
||||
if (!Util::dir_exist(Util::backup_path))
|
||||
{
|
||||
cerr << "Failed to restore!" << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
num_update += Database::read_update_log(this->store_path + '/' + this->update_log_since_backup, insertions, removals);
|
||||
|
||||
cout << "Failed to restore from original db file, trying to restore from backup file." << endl;
|
||||
cout << "Your old db file will be stored at " << this->store_path << ".bad" << endl;
|
||||
|
||||
sys_cmd = "rm -rf " + this->store_path + ".bad";
|
||||
system(sys_cmd.c_str());
|
||||
sys_cmd = "cp -r " + this->store_path + ' ' + this->store_path + ".bad";
|
||||
system(sys_cmd.c_str());
|
||||
sys_cmd = "rm -rf " + this->store_path;
|
||||
system(sys_cmd.c_str());
|
||||
sys_cmd = "cp -r " + backup_path + ' ' + this->store_path;
|
||||
system(sys_cmd.c_str());
|
||||
Util::create_file(this->store_path + '/' + this->update_log);
|
||||
|
||||
if (!this->load())
|
||||
{
|
||||
this->clear();
|
||||
cerr << "Failed to restore from backup file." << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
num_update += Database::read_update_log(this->store_path + '/' + this->update_log_since_backup, insertions, removals);
|
||||
}
|
||||
else
|
||||
{
|
||||
num_update += Database::read_update_log(this->store_path + '/' + this->update_log, insertions, removals);
|
||||
}
|
||||
|
||||
cout << "Restoring " << num_update << " updates." << endl;
|
||||
|
||||
if (!this->restore_update(insertions, removals))
|
||||
{
|
||||
cerr << "Failed to restore updates" << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
cout << "Restore completed." << endl;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int
|
||||
Database::read_update_log(const string _path, multiset<string>& _i, multiset<string>& _r)
|
||||
{
|
||||
ifstream in;
|
||||
#ifdef DEBUG
|
||||
cout<<_path<<endl;
|
||||
#endif
|
||||
in.open(_path.c_str(), ios::in);
|
||||
if (!in) {
|
||||
cerr << "Failed to read update log." << endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ret = 0;
|
||||
int buffer_size = 1024 + 2;
|
||||
char buffer[buffer_size];
|
||||
in.getline(buffer, buffer_size);
|
||||
while (!in.eof() && buffer[0]) {
|
||||
string triple;
|
||||
switch (buffer[0]) {
|
||||
case 'I':
|
||||
triple = string(buffer + 2);
|
||||
ret++;
|
||||
_i.insert(triple);
|
||||
break;
|
||||
case 'R':
|
||||
triple = string(buffer + 2);
|
||||
ret++;
|
||||
_r.insert(triple);
|
||||
break;
|
||||
default:
|
||||
cerr << "Bad line in update log!" << endl;
|
||||
}
|
||||
in.getline(buffer, buffer_size);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool
|
||||
Database::restore_update(multiset<string>& _i, multiset<string>& _r)
|
||||
{
|
||||
multiset<string>::iterator pos;
|
||||
multiset<string>::iterator it_to_erase = _r.end();
|
||||
for (multiset<string>::iterator it = _r.begin(); it != _r.end(); it++)
|
||||
{
|
||||
//NOTICE: check the intersect of insert_set and remove_set
|
||||
if (it_to_erase != _r.end()) {
|
||||
_r.erase(it_to_erase);
|
||||
}
|
||||
pos = _i.find(*it);
|
||||
if (pos != _i.end()) {
|
||||
_i.erase(pos);
|
||||
it_to_erase = it;
|
||||
}
|
||||
else {
|
||||
it_to_erase = _r.end();
|
||||
}
|
||||
}
|
||||
if (it_to_erase != _r.end()) {
|
||||
_r.erase(it_to_erase);
|
||||
}
|
||||
|
||||
string tmp_path = this->store_path + "/.update_tmp";
|
||||
|
||||
ofstream out_i;
|
||||
out_i.open(tmp_path.c_str(), ios::out);
|
||||
if (!out_i) {
|
||||
cerr << "Failed to open temp file, restore failed!" << endl;
|
||||
return false;
|
||||
}
|
||||
for (multiset<string>::iterator it = _i.begin(); it != _i.end(); it++) {
|
||||
out_i << *it << endl;
|
||||
}
|
||||
out_i.close();
|
||||
if (!this->insert(tmp_path, true))
|
||||
//if (!this->remove(tmp_path, true))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
ofstream out_r;
|
||||
out_r.open(tmp_path.c_str(), ios::out);
|
||||
if (!out_r) {
|
||||
cerr << "Failed to open temp file!" << endl;
|
||||
return false;
|
||||
}
|
||||
for (multiset<string>::iterator it = _r.begin(); it != _r.end(); it++) {
|
||||
out_r << *it << endl;
|
||||
}
|
||||
out_r.close();
|
||||
if (!this->remove(tmp_path, true))
|
||||
//if (!this->insert(tmp_path, true))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
string cmd = "rm " + tmp_path;
|
||||
system(cmd.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
Database::clear_update_log()
|
||||
{
|
||||
string cmd = "rm " + this->store_path + '/' + this->update_log;
|
||||
system(cmd.c_str());
|
||||
Util::create_file(this->store_path + '/' + this->update_log);
|
||||
}
|
||||
|
||||
bool
|
||||
Database::objIDIsEntityID(int _id)
|
||||
{
|
||||
|
@ -2952,6 +3282,7 @@ Database::getFinalResult(SPARQLquery& _sparql_q, ResultSet& _result_set)
|
|||
vector<int> keys;
|
||||
vector<bool> desc;
|
||||
_result_set.openStream(keys, desc);
|
||||
//_result_set.openStream(keys, desc, 0, -1);
|
||||
#ifdef DEBUG_PRECISE
|
||||
printf("getFinalResult:after open stream\n");
|
||||
#endif
|
||||
|
|
|
@ -47,6 +47,7 @@ public:
|
|||
|
||||
bool load();
|
||||
bool unload();
|
||||
void clear();
|
||||
int query(const string _query, ResultSet& _result_set, FILE* _fp = stdout);
|
||||
|
||||
//1. if subject of _triple doesn't exist,
|
||||
|
@ -56,8 +57,11 @@ public:
|
|||
|
||||
bool build(const string& _rdf_file);
|
||||
//interfaces to insert/delete from given rdf file
|
||||
bool insert(std::string _rdf_file);
|
||||
bool remove(std::string _rdf_file);
|
||||
bool insert(std::string _rdf_file, bool _is_restore = false);
|
||||
bool remove(std::string _rdf_file, bool _is_restore = false);
|
||||
|
||||
bool backup();
|
||||
bool restore();
|
||||
|
||||
/* name of this DB*/
|
||||
string getName();
|
||||
|
@ -97,6 +101,9 @@ private:
|
|||
//B means binary
|
||||
string signature_binary_file;
|
||||
|
||||
string update_log;
|
||||
string update_log_since_backup;
|
||||
|
||||
//pre2num mapping
|
||||
TNUM* pre2num;
|
||||
//valid: check from minNumPID to maxNumPID
|
||||
|
@ -163,6 +170,7 @@ private:
|
|||
//check whether the relative 3-tuples exist
|
||||
//usually, through sp2olist
|
||||
bool exist_triple(int _sub_id, int _pre_id, int _obj_id);
|
||||
bool exist_triple(const TripleWithObjType& _triple);
|
||||
|
||||
//* _rdf_file denotes the path of the RDF file, where stores the rdf data
|
||||
//* there are many step in this function, each one responds to an sub-function
|
||||
|
@ -183,9 +191,9 @@ private:
|
|||
bool insertTriple(const TripleWithObjType& _triple, vector<int>* _vertices = NULL, vector<int>* _predicates = NULL);
|
||||
bool removeTriple(const TripleWithObjType& _triple, vector<int>* _vertices = NULL, vector<int>* _predicates = NULL);
|
||||
//NOTICE:one by one is too costly, sort and insert/delete at a time will be better
|
||||
int insert(const TripleWithObjType* _triples, int _triple_num);
|
||||
int insert(const TripleWithObjType* _triples, int _triple_num, bool _is_restore = false);
|
||||
//bool insert(const vector<TripleWithObjType>& _triples, vector<int>& _vertices, vector<int>& _predicates);
|
||||
int remove(const TripleWithObjType* _triples, int _triple_num);
|
||||
int remove(const TripleWithObjType* _triples, int _triple_num, bool _is_restore = false);
|
||||
//bool remove(const vector<TripleWithObjType>& _triples, vector<int>& _vertices, vector<int>& _predicates);
|
||||
|
||||
bool sub2id_pre2id_obj2id_RDFintoSignature(const string _rdf_file, int**& _p_id_tuples, int & _id_tuples_max);
|
||||
|
@ -202,6 +210,10 @@ private:
|
|||
|
||||
//get the final string result_set from SPARQLquery
|
||||
bool getFinalResult(SPARQLquery& _sparql_q, ResultSet& _result_set);
|
||||
|
||||
static int read_update_log(const string _path, multiset<string>& _i, multiset<string>& _r);
|
||||
bool restore_update(multiset<string>& _i, multiset<string>& _r);
|
||||
void clear_update_log();
|
||||
};
|
||||
|
||||
#endif //_DATABASE_DATABASE_H
|
||||
|
|
|
@ -44,6 +44,9 @@ int query_handler(const vector<string>&);
|
|||
int show_handler(const vector<string>&);
|
||||
int add_handler(const vector<string>&);
|
||||
int sub_handler(const vector<string>&);
|
||||
//backup commands
|
||||
int backup_handler(const vector<string>&);
|
||||
int restore_handler(const vector<string>&);
|
||||
|
||||
//A structure which contains information on the commands this program can understand.
|
||||
typedef struct {
|
||||
|
@ -71,6 +74,8 @@ COMMAND native_commands[] = {
|
|||
{ "restart", restart_handler, "\tRestart local server." },
|
||||
{ "port", port_handler, "\tChange port of local server." },
|
||||
{ "printport", printport_handler, "Print local server's port configuration."},
|
||||
{ "backup", backup_handler, "\tBackup current database." },
|
||||
{ "restore", restore_handler, "\tRestore a database backup." },
|
||||
|
||||
{ NULL, NULL, NULL }
|
||||
};
|
||||
|
@ -914,7 +919,7 @@ int drop_handler(const vector<string>& args) {
|
|||
cerr << "You should use exactly the same db name as building, which should not end with \".db\"" << endl;
|
||||
return -1;
|
||||
}
|
||||
database += ".db";
|
||||
//database += ".db";
|
||||
|
||||
//remote mode
|
||||
if (gc != NULL) {
|
||||
|
@ -1112,7 +1117,7 @@ int add_handler(const vector<string>& args) {
|
|||
cerr << "You should use exactly the same db name as building, which should not end with \".db\"" << endl;
|
||||
return -1;
|
||||
}
|
||||
database += ".db";
|
||||
//database += ".db";
|
||||
Database _db(database);
|
||||
if (!_db.insert(args[1])) {
|
||||
cerr << "Failed to insert!" << endl;
|
||||
|
@ -1135,8 +1140,9 @@ int sub_handler(const vector<string>& args) {
|
|||
cerr << "You should use exactly the same db name as building, which should not end with \".db\"" << endl;
|
||||
return -1;
|
||||
}
|
||||
database += ".db";
|
||||
Database _db(database); if (!_db.remove(args[1])) {
|
||||
//database += ".db";
|
||||
Database _db(database);
|
||||
if (!_db.remove(args[1])) {
|
||||
cerr << "Failed to remove!" << endl;
|
||||
return -1;
|
||||
}
|
||||
|
@ -1191,6 +1197,41 @@ int printport_handler(const vector<string>& args) {
|
|||
return system("bin/gserver -P");
|
||||
}
|
||||
|
||||
int backup_handler(const vector<string>& args) {
|
||||
if (!args.empty()) {
|
||||
cerr << "Too many arguments!" << endl;
|
||||
return -1;
|
||||
}
|
||||
if (current_database == NULL) {
|
||||
cerr << "No database loaded!" << endl;
|
||||
return -1;
|
||||
}
|
||||
current_database->backup();
|
||||
return 0;
|
||||
}
|
||||
|
||||
int restore_handler(const vector<string>& args) {
|
||||
if (args.size() != 1) {
|
||||
cerr << "Exactly 1 argument required!" << endl;
|
||||
return -1;
|
||||
}
|
||||
if (current_database != NULL) {
|
||||
cerr << "Please unload your database first!" << endl;
|
||||
return -1;
|
||||
}
|
||||
string database = args[0];
|
||||
if (database.length() > 3 && database.substr(database.length() - 3, 3) == ".db") {
|
||||
cerr << "You should use exactly the same db name as building, which should not end with \".db\"" << endl;
|
||||
return -1;
|
||||
}
|
||||
//database += ".db";
|
||||
|
||||
Database db(database);
|
||||
if (!db.restore()) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
#include "../Util/Util.h"
|
||||
#include "../Server/Server.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
bool send_backup_msg(unsigned short _port, long _time);
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
|
||||
#ifdef DEBUG
|
||||
Util util;
|
||||
#endif
|
||||
|
||||
if (argc != 3) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!Util::isValidPort(string(argv[1]))) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
unsigned short port;
|
||||
{
|
||||
stringstream ss(argv[1]);
|
||||
ss >> port;
|
||||
}
|
||||
|
||||
long backup_time;
|
||||
{
|
||||
stringstream ss(argv[2]);
|
||||
ss >> backup_time;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
time_t cur_time = time(NULL);
|
||||
while (cur_time >= backup_time) {
|
||||
backup_time += Util::gserver_backup_interval;
|
||||
}
|
||||
sleep(backup_time - cur_time);
|
||||
if (!send_backup_msg(port, backup_time)) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool send_backup_msg(unsigned short _port, long _time) {
|
||||
stringstream ss;
|
||||
ss << "backup " << _time;
|
||||
Socket socket;
|
||||
if (!socket.create() || !socket.connect("127.0.0.1", _port) || !socket.send(ss.str())) {
|
||||
return false;
|
||||
}
|
||||
string recv_msg;
|
||||
socket.recv(recv_msg);
|
||||
socket.close();
|
||||
if (recv_msg != "done") {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
8
NOTES.md
8
NOTES.md
|
@ -18,6 +18,12 @@ demo应用全部外链,具体服务放在实验室公开的主机上,通过i
|
|||
|
||||
---
|
||||
|
||||
# 数据库可恢复性、事务与日志
|
||||
|
||||
以后可能要支持事务,那样就需要日志能够支持UNDO, REDO等操作,可以以后根据需要再修改。
|
||||
数据库多版本,可以指定恢复到一个带标签的版本(比如时间作为标签,用户指定标签)
|
||||
|
||||
|
||||
# 并行策略- 线程控制模块
|
||||
|
||||
不宜使用并行框架,可使用C的pthread,boost的thread库,或者启用C++11,gcc编译器需要高于4.8.1才能完整支持C++11
|
||||
|
@ -591,6 +597,8 @@ ACID? neo4j GraphDB
|
|||
是否可以考虑用vf2算法来作子图同构?比较效率,相互结合?
|
||||
考虑按谓词频度划分,比如建立两棵sp2o树,两者的缓存大小应该不同
|
||||
|
||||
gStore实现的是子图同态算法,要实现子图同构,只要保证中间表扩展时每一行中不会出现重复的元素即可
|
||||
|
||||
Consider the use of Bloom Filter and FM-sketches
|
||||
|
||||
---
|
||||
|
|
|
@ -0,0 +1,626 @@
|
|||
/*=============================================================================
|
||||
# Filename: Server.cpp
|
||||
# Author: Bookug Lobert
|
||||
# Mail: 1181955272@qq.com
|
||||
# Last Modified: 2015-10-25 13:47
|
||||
# Description:
|
||||
=============================================================================*/
|
||||
|
||||
#include "Server.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
Server::Server(unsigned short _port)
|
||||
{
|
||||
this->connectionPort = _port;
|
||||
this->connectionMaxNum = Socket::MAX_CONNECTIONS;
|
||||
this->databaseMaxNum = 1; // will be updated when supporting multiple databases.
|
||||
this->database = NULL;
|
||||
this->next_backup = 0;
|
||||
this->scheduler_pid = 0;
|
||||
}
|
||||
|
||||
Server::~Server()
|
||||
{
|
||||
delete this->database;
|
||||
}
|
||||
|
||||
bool
|
||||
Server::createConnection()
|
||||
{
|
||||
bool flag;
|
||||
|
||||
flag = this->socket.create();
|
||||
if (!flag)
|
||||
{
|
||||
cerr << Util::getTimeString() << "cannot create socket. @Server::createConnection" << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
flag = this->socket.bind(this->connectionPort);
|
||||
if (!flag)
|
||||
{
|
||||
cerr << Util::getTimeString() << "cannot bind to port " << this->connectionPort << ". @Server::createConnection" << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
flag = this->socket.listen();
|
||||
if (!flag)
|
||||
{
|
||||
cerr << Util::getTimeString() << "cannot listen to port" << this->connectionPort << ". @Server::createConnection" << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
Server::deleteConnection()
|
||||
{
|
||||
bool flag = this->socket.close();
|
||||
|
||||
return flag;
|
||||
}
|
||||
|
||||
bool
|
||||
Server::response(Socket _socket, std::string& _msg)
|
||||
{
|
||||
bool flag = _socket.send(_msg);
|
||||
|
||||
return flag;
|
||||
}
|
||||
|
||||
void
|
||||
Server::listen()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
Socket new_server_socket;
|
||||
|
||||
cout << Util::getTimeString() << "Wait for input..." << endl;
|
||||
|
||||
this->socket.accept(new_server_socket);
|
||||
|
||||
cout << Util::getTimeString() << "accept new socket." << endl;
|
||||
|
||||
string recv_cmd;
|
||||
bool recv_return = new_server_socket.recv(recv_cmd);
|
||||
if (!recv_return)
|
||||
{
|
||||
cerr << Util::getTimeString() << "receive command from client error. @Server::listen" << endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
cout << Util::getTimeString() << "received msg: " << recv_cmd << endl;
|
||||
|
||||
Operation operation;
|
||||
bool parser_return = this->parser(recv_cmd, operation);
|
||||
cout << Util::getTimeString() << "parser_return=" << parser_return << endl; //debug
|
||||
if (!parser_return)
|
||||
{
|
||||
cout << Util::getTimeString() << "parser command error. @Server::listen" << endl;
|
||||
string ret_msg = "invalid command.";
|
||||
this->response(new_server_socket, ret_msg);
|
||||
new_server_socket.close();
|
||||
continue;
|
||||
}
|
||||
|
||||
string ret_msg;
|
||||
bool _stop = false;
|
||||
CommandType cmd_type = operation.getCommand();
|
||||
switch (cmd_type)
|
||||
{
|
||||
case CMD_TEST:
|
||||
{
|
||||
ret_msg = "OK";
|
||||
break;
|
||||
}
|
||||
case CMD_LOAD:
|
||||
{
|
||||
string db_name = operation.getParameter(0);
|
||||
this->loadDatabase(db_name, "", ret_msg);
|
||||
break;
|
||||
}
|
||||
case CMD_UNLOAD:
|
||||
{
|
||||
string db_name = operation.getParameter(0);
|
||||
this->unloadDatabase(db_name, "", ret_msg);
|
||||
break;
|
||||
}
|
||||
case CMD_IMPORT:
|
||||
{
|
||||
string db_name = operation.getParameter(0);
|
||||
string rdf_path = operation.getParameter(1);
|
||||
this->importRDF(db_name, "", rdf_path, ret_msg);
|
||||
break;
|
||||
}
|
||||
case CMD_DROP:
|
||||
{
|
||||
string db_name = operation.getParameter(0);
|
||||
this->dropDatabase(db_name, "", ret_msg);
|
||||
break;
|
||||
}
|
||||
case CMD_QUERY:
|
||||
{
|
||||
string query = operation.getParameter(0);
|
||||
pthread_t timer = Server::start_timer();
|
||||
if (timer == 0) {
|
||||
cerr << Util::getTimeString() << "Failed to start timer." << endl;
|
||||
}
|
||||
this->query(query, ret_msg);
|
||||
if (timer != 0 && !Server::stop_timer(timer)) {
|
||||
cerr << Util::getTimeString() << "Failed to stop timer." << endl;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CMD_SHOW:
|
||||
{
|
||||
string para = operation.getParameter(0);
|
||||
if (para == "databases" || para == "all")
|
||||
{
|
||||
this->showDatabases(para, "", ret_msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
ret_msg = "invalid command.";
|
||||
}
|
||||
break;
|
||||
}
|
||||
//case CMD_INSERT:
|
||||
//{
|
||||
// string db_name = operation.getParameter(0);
|
||||
// string rdf_path = operation.getParameter(1);
|
||||
// this->insertTriple(db_name, "", rdf_path, ret_msg);
|
||||
// break;
|
||||
//}
|
||||
case CMD_STOP:
|
||||
{
|
||||
this->stopServer(ret_msg);
|
||||
_stop = true;
|
||||
break;
|
||||
}
|
||||
case CMD_BACKUP: {
|
||||
string para = operation.getParameter(0);
|
||||
stringstream ss(para);
|
||||
long time_backup;
|
||||
ss >> time_backup;
|
||||
if (this->next_backup == 0) {
|
||||
break;
|
||||
}
|
||||
while (this->next_backup < time_backup) {
|
||||
this->next_backup += Util::gserver_backup_interval;
|
||||
}
|
||||
if (this->next_backup == time_backup) {
|
||||
this->backup(ret_msg);
|
||||
}
|
||||
else {
|
||||
ret_msg = "done";
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
cerr << Util::getTimeString() << "this command is not supported by now. @Server::listen" << endl;
|
||||
}
|
||||
|
||||
this->response(new_server_socket, ret_msg);
|
||||
new_server_socket.close();
|
||||
if (_stop) {
|
||||
this->deleteConnection();
|
||||
cout << Util::getTimeString() << "server stopped." << endl;
|
||||
break;
|
||||
}
|
||||
if (this->next_backup > 0) {
|
||||
time_t cur_time = time(NULL);
|
||||
if (cur_time >= this->next_backup) {
|
||||
string str;
|
||||
this->backup(str);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
Server::parser(std::string _raw_cmd, Operation& _ret_oprt)
|
||||
{
|
||||
int cmd_start_pos = 0;
|
||||
int raw_len = (int)_raw_cmd.size();
|
||||
|
||||
for (int i = 0; i < raw_len; i++) {
|
||||
if (_raw_cmd[i] == '\n') {
|
||||
_raw_cmd[i] = ' ';
|
||||
}
|
||||
}
|
||||
|
||||
while (cmd_start_pos < raw_len && _raw_cmd[cmd_start_pos] == ' ') {
|
||||
cmd_start_pos++;
|
||||
}
|
||||
if (cmd_start_pos == raw_len)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
int idx1 = raw_len;
|
||||
for (int i = cmd_start_pos; i < raw_len; i++) {
|
||||
if (_raw_cmd[i] == ' ')
|
||||
{
|
||||
idx1 = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
string cmd = _raw_cmd.substr(cmd_start_pos, idx1 - cmd_start_pos);
|
||||
int para_start_pos = idx1;
|
||||
while (para_start_pos < raw_len && _raw_cmd[para_start_pos] == ' ')
|
||||
para_start_pos++;
|
||||
|
||||
int para_cnt;
|
||||
if (cmd == "test") {
|
||||
_ret_oprt.setCommand(CMD_TEST);
|
||||
para_cnt = 0;
|
||||
}
|
||||
else if (cmd == "load") {
|
||||
_ret_oprt.setCommand(CMD_LOAD);
|
||||
para_cnt = 1;
|
||||
}
|
||||
else if (cmd == "unload") {
|
||||
_ret_oprt.setCommand(CMD_UNLOAD);
|
||||
para_cnt = 1;
|
||||
}
|
||||
else if (cmd == "import")
|
||||
{
|
||||
_ret_oprt.setCommand(CMD_IMPORT);
|
||||
para_cnt = 2;
|
||||
}
|
||||
else if (cmd == "query")
|
||||
{
|
||||
_ret_oprt.setCommand(CMD_QUERY);
|
||||
para_cnt = 1;
|
||||
}
|
||||
else if (cmd == "show")
|
||||
{
|
||||
_ret_oprt.setCommand(CMD_SHOW);
|
||||
para_cnt = 1;
|
||||
}
|
||||
else if (cmd == "insert")
|
||||
{
|
||||
_ret_oprt.setCommand(CMD_INSERT);
|
||||
para_cnt = 2;
|
||||
}
|
||||
else if (cmd == "drop") {
|
||||
_ret_oprt.setCommand(CMD_DROP);
|
||||
para_cnt = 1;
|
||||
}
|
||||
else if (cmd == "stop") {
|
||||
_ret_oprt.setCommand(CMD_STOP);
|
||||
para_cnt = 0;
|
||||
}
|
||||
else if (cmd == "backup") {
|
||||
_ret_oprt.setCommand(CMD_BACKUP);
|
||||
para_cnt = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
vector<string> paras;
|
||||
int cur_idx = para_start_pos;
|
||||
for (int i = 1; i <= para_cnt; i++)
|
||||
{
|
||||
if (cur_idx >= raw_len)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
int next_idx = raw_len;
|
||||
if (i < para_cnt)
|
||||
{
|
||||
for (int j = cur_idx; j<raw_len; j++)
|
||||
if (_raw_cmd[j] == ' ')
|
||||
{
|
||||
next_idx = j;
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (int j = raw_len - 1; j>cur_idx; j--)
|
||||
if (_raw_cmd[j] != ' ')
|
||||
{
|
||||
next_idx = j + 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
paras.push_back(_raw_cmd.substr(cur_idx, next_idx - cur_idx));
|
||||
|
||||
cur_idx = next_idx;
|
||||
while (cur_idx < raw_len && _raw_cmd[cur_idx] == ' ')
|
||||
cur_idx++;
|
||||
}
|
||||
|
||||
if (cur_idx != raw_len)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
_ret_oprt.setParameter(paras);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
Server::createDatabase(std::string _db_name, std::string _ac_name, std::string& _ret_msg)
|
||||
{
|
||||
// to be implemented...
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
Server::dropDatabase(std::string _db_name, std::string _ac_name, std::string& _ret_msg)
|
||||
{
|
||||
if (this->database != NULL) {
|
||||
_ret_msg = "please do not use this command when you are using a database.";
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t length = _db_name.length();
|
||||
if (length < 3 || _db_name.substr(length - 3, 3) != ".db") {
|
||||
_ret_msg = "you can only drop databases whose names end with \".db\"";
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string cmd = std::string("rm -rf ") + _db_name;
|
||||
int ret = system(cmd.c_str());
|
||||
if (ret == 0) {
|
||||
_ret_msg = "drop database done.";
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
_ret_msg = "drop database failed.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
Server::loadDatabase(std::string _db_name, std::string _ac_name, std::string& _ret_msg)
|
||||
{
|
||||
if (this->database == NULL)
|
||||
{
|
||||
this->database = new Database(_db_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
_ret_msg = "please unload the current db first: " + this->database->getName();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this->database->load()) {
|
||||
_ret_msg = "load database failed.";
|
||||
delete this->database;
|
||||
this->database = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
pid_t fpid = vfork();
|
||||
|
||||
// child, scheduler
|
||||
if (fpid == 0) {
|
||||
time_t cur_time = time(NULL);
|
||||
long time_backup = Util::read_backup_time();
|
||||
long first_backup = cur_time - (cur_time - time_backup) % Util::gserver_backup_interval
|
||||
+ Util::gserver_backup_interval;
|
||||
this->next_backup = first_backup;
|
||||
string s_port = Util::int2string(this->connectionPort);
|
||||
string s_next_backup = Util::int2string(first_backup);
|
||||
execl("bin/gserver_backup_scheduler", "gserver_backup_scheduler", s_port.c_str(), s_next_backup.c_str(), NULL);
|
||||
exit(0);
|
||||
return true;
|
||||
}
|
||||
// parent
|
||||
if (fpid > 0) {
|
||||
this->scheduler_pid = fpid;
|
||||
}
|
||||
// fork failure
|
||||
else if (fpid < 0) {
|
||||
cerr << Util::getTimeString() << "Database will not be backed-up automatically." << endl;
|
||||
}
|
||||
_ret_msg = "load database done.";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
Server::unloadDatabase(std::string _db_name, std::string _ac_name, std::string& _ret_msg)
|
||||
{
|
||||
if (this->database == NULL || this->database->getName() != _db_name)
|
||||
{
|
||||
_ret_msg = "database:" + _db_name + " is not loaded.";
|
||||
return false;
|
||||
}
|
||||
|
||||
delete this->database;
|
||||
this->database = NULL;
|
||||
_ret_msg = "unload database done.";
|
||||
|
||||
this->next_backup = 0;
|
||||
kill(this->scheduler_pid, SIGTERM);
|
||||
waitpid(this->scheduler_pid, NULL, 0);
|
||||
this->scheduler_pid = 0;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
Server::importRDF(std::string _db_name, std::string _ac_name, std::string _rdf_path, std::string& _ret_msg)
|
||||
{
|
||||
//if (this->database != NULL && this->database->getName() != _db_name)
|
||||
if (this->database != NULL)
|
||||
{
|
||||
//delete this->database;
|
||||
//NOTICE:if there is a db loaded, we should not build directly, tell user to unload it first
|
||||
_ret_msg = "please unload the current db first: " + this->database->getName();
|
||||
return false;
|
||||
}
|
||||
|
||||
this->database = new Database(_db_name);
|
||||
bool flag = this->database->build(_rdf_path);
|
||||
|
||||
delete this->database;
|
||||
this->database = NULL;
|
||||
|
||||
if (flag)
|
||||
{
|
||||
_ret_msg = "import RDF file to database done.";
|
||||
}
|
||||
else
|
||||
{
|
||||
_ret_msg = "import RDF file to database failed.";
|
||||
}
|
||||
|
||||
return flag;
|
||||
}
|
||||
|
||||
//bool
|
||||
//Server::insertTriple(std::string _db_name, std::string _ac_name, std::string _rdf_path, std::string& _ret_msg)
|
||||
//{
|
||||
// if (this->database != NULL)
|
||||
// {
|
||||
// this->database->unload();
|
||||
// delete this->database;
|
||||
// }
|
||||
// this->database = new Database(_db_name);
|
||||
// bool flag = this->database->insert(_rdf_path);
|
||||
// if (flag)
|
||||
// {
|
||||
// _ret_msg = "insert triple file to database done.";
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// _ret_msg = "import triple file to database failed.";
|
||||
// }
|
||||
// return flag;
|
||||
//}
|
||||
|
||||
bool
|
||||
Server::query(const string _query, string& _ret_msg)
|
||||
{
|
||||
cout << Util::getTimeString() << "Server query(): " << _query << endl;
|
||||
|
||||
if (this->database == NULL)
|
||||
{
|
||||
_ret_msg = "database has not been loaded.";
|
||||
return false;
|
||||
}
|
||||
|
||||
FILE* output = NULL;
|
||||
#ifdef OUTPUT_QUERY_RESULT
|
||||
string path = "logs/gserver_query.log";
|
||||
output = fopen(path.c_str(), "w");
|
||||
#endif
|
||||
|
||||
ResultSet res_set;
|
||||
int query_ret = this->database->query(_query, res_set, output);
|
||||
if (output != NULL)
|
||||
{
|
||||
fclose(output);
|
||||
}
|
||||
|
||||
bool flag = true;
|
||||
//cout<<"Server query ret: "<<query_ret<<endl;
|
||||
if (query_ret <= -100) //select query
|
||||
{
|
||||
//_ret_msg = "results are too large!";
|
||||
//BETTER: divide and transfer if too large to be placed in memory, using Stream
|
||||
if (query_ret == -100)
|
||||
{
|
||||
_ret_msg = res_set.to_str();
|
||||
}
|
||||
else //query error
|
||||
{
|
||||
flag = false;
|
||||
_ret_msg = "query failed.";
|
||||
//BETTER: {type:select} {success:false}
|
||||
}
|
||||
}
|
||||
else //update query
|
||||
{
|
||||
if (query_ret >= 0)
|
||||
{
|
||||
_ret_msg = "update num: " + Util::int2string(query_ret);
|
||||
}
|
||||
else //update error
|
||||
{
|
||||
flag = false;
|
||||
_ret_msg = "update failed.";
|
||||
}
|
||||
}
|
||||
|
||||
return flag;
|
||||
}
|
||||
|
||||
bool
|
||||
Server::showDatabases(string _para, string _ac_name, string& _ret_msg)
|
||||
{
|
||||
if (_para == "all")
|
||||
{
|
||||
_ret_msg = Util::getItemsFromDir(Util::db_home);
|
||||
return true;
|
||||
}
|
||||
if (this->database != NULL)
|
||||
{
|
||||
_ret_msg = "\n" + this->database->getName() + "\n";
|
||||
}
|
||||
else
|
||||
{
|
||||
_ret_msg = "\n[empty]\n";
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Server::stopServer(string& _ret_msg) {
|
||||
if (this->database != NULL) {
|
||||
delete this->database;
|
||||
this->database = NULL;
|
||||
}
|
||||
_ret_msg = "server stopped.";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Server::backup(string& _ret_msg) {
|
||||
this->next_backup += Util::gserver_backup_interval;
|
||||
if (this->database == NULL) {
|
||||
_ret_msg = "No database in use.";
|
||||
return false;
|
||||
}
|
||||
if (!this->database->backup()) {
|
||||
_ret_msg = "Backup failed.";
|
||||
return false;
|
||||
}
|
||||
_ret_msg = "done";
|
||||
return true;
|
||||
}
|
||||
|
||||
pthread_t Server::start_timer() {
|
||||
pthread_t timer_thread;
|
||||
if (pthread_create(&timer_thread, NULL, Server::timer, NULL) == 0) {
|
||||
return timer_thread;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool Server::stop_timer(pthread_t _timer) {
|
||||
return pthread_kill(_timer, SIGTERM) == 0;
|
||||
}
|
||||
|
||||
void* Server::timer(void* _args) {
|
||||
signal(SIGTERM, Server::timer_sigterm_handler);
|
||||
sleep(Util::gserver_query_timeout);
|
||||
cerr << Util::getTimeString() << "Query out of time." << endl;
|
||||
abort();
|
||||
}
|
||||
|
||||
void Server::timer_sigterm_handler(int _signal_num) {
|
||||
pthread_exit(0);
|
||||
}
|
|
@ -14,8 +14,10 @@
|
|||
//NOTICE:CMD_DROP is used to remove the database, and CMD_CREATE is not useful because
|
||||
//we always need to import a dataset to create a gstore db
|
||||
enum CommandType {
|
||||
CMD_CONNECT, CMD_EXIT, CMD_TEST, CMD_LOAD, CMD_UNLOAD, CMD_CREATE, CMD_DROP,
|
||||
CMD_IMPORT, CMD_QUERY, CMD_SHOW, CMD_INSERT, CMD_STOP, CMD_OTHER
|
||||
CMD_CONNECT, CMD_TEST, CMD_LOAD, CMD_UNLOAD, CMD_CREATE, CMD_DROP, CMD_IMPORT,
|
||||
CMD_QUERY, CMD_SHOW, CMD_INSERT, CMD_STOP, CMD_BACKUP, CMD_OTHER
|
||||
//CMD_CONNECT, CMD_EXIT, CMD_TEST, CMD_LOAD, CMD_UNLOAD, CMD_CREATE, CMD_DROP,
|
||||
//CMD_IMPORT, CMD_QUERY, CMD_SHOW, CMD_INSERT, CMD_STOP, CMD_OTHER
|
||||
}; // extend the operation command type here.
|
||||
|
||||
class Operation
|
||||
|
@ -47,4 +49,4 @@ private:
|
|||
std::vector<std::string> parameters;
|
||||
};
|
||||
|
||||
#endif /* OPERATION_H_ */
|
||||
#endif /* OPERATION_H_ */
|
||||
|
|
|
@ -10,15 +10,15 @@
|
|||
|
||||
using namespace std;
|
||||
|
||||
Server::Server()
|
||||
{
|
||||
this->connectionPort = Socket::DEFAULT_CONNECT_PORT;
|
||||
this->connectionMaxNum = Socket::MAX_CONNECTIONS;
|
||||
this->databaseMaxNum = 1; // will be updated when supporting multiple databases.
|
||||
this->database = NULL;
|
||||
this->db_home = Util::global_config["db_home"];
|
||||
this->db_suffix = Util::global_config["db_suffix"];
|
||||
}
|
||||
//Server::Server()
|
||||
//{
|
||||
//this->connectionPort = Socket::DEFAULT_CONNECT_PORT;
|
||||
//this->connectionMaxNum = Socket::MAX_CONNECTIONS;
|
||||
//this->databaseMaxNum = 1; // will be updated when supporting multiple databases.
|
||||
//this->database = NULL;
|
||||
//this->db_home = Util::global_config["db_home"];
|
||||
//this->db_suffix = Util::global_config["db_suffix"];
|
||||
//}
|
||||
|
||||
Server::Server(unsigned short _port)
|
||||
{
|
||||
|
@ -26,6 +26,10 @@ Server::Server(unsigned short _port)
|
|||
this->connectionMaxNum = Socket::MAX_CONNECTIONS;
|
||||
this->databaseMaxNum = 1; // will be updated when supporting multiple databases.
|
||||
this->database = NULL;
|
||||
this->db_home = Util::global_config["db_home"];
|
||||
this->db_suffix = Util::global_config["db_suffix"];
|
||||
this->next_backup = 0;
|
||||
this->scheduler_pid = 0;
|
||||
}
|
||||
|
||||
Server::~Server()
|
||||
|
@ -151,7 +155,14 @@ Server::listen()
|
|||
case CMD_QUERY:
|
||||
{
|
||||
string query = operation.getParameter(0);
|
||||
pthread_t timer = Server::start_timer();
|
||||
if (timer == 0) {
|
||||
cerr << Util::getTimeString() << "Failed to start timer." << endl;
|
||||
}
|
||||
this->query(query, ret_msg);
|
||||
if (timer != 0 && !Server::stop_timer(timer)) {
|
||||
cerr << Util::getTimeString() << "Failed to stop timer." << endl;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CMD_SHOW:
|
||||
|
@ -180,6 +191,25 @@ Server::listen()
|
|||
_stop = true;
|
||||
break;
|
||||
}
|
||||
case CMD_BACKUP: {
|
||||
string para = operation.getParameter(0);
|
||||
stringstream ss(para);
|
||||
long time_backup;
|
||||
ss >> time_backup;
|
||||
if (this->next_backup == 0) {
|
||||
break;
|
||||
}
|
||||
while (this->next_backup < time_backup) {
|
||||
this->next_backup += Util::gserver_backup_interval;
|
||||
}
|
||||
if (this->next_backup == time_backup) {
|
||||
this->backup(ret_msg);
|
||||
}
|
||||
else {
|
||||
ret_msg = "done";
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
cerr << Util::getTimeString() << "this command is not supported by now. @Server::listen" << endl;
|
||||
}
|
||||
|
@ -191,6 +221,13 @@ Server::listen()
|
|||
cout << Util::getTimeString() << "server stopped." << endl;
|
||||
break;
|
||||
}
|
||||
if (this->next_backup > 0) {
|
||||
time_t cur_time = time(NULL);
|
||||
if (cur_time >= this->next_backup) {
|
||||
string str;
|
||||
this->backup(str);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,6 +306,10 @@ Server::parser(std::string _raw_cmd, Operation& _ret_oprt)
|
|||
_ret_oprt.setCommand(CMD_STOP);
|
||||
para_cnt = 0;
|
||||
}
|
||||
else if (cmd == "backup") {
|
||||
_ret_oprt.setCommand(CMD_BACKUP);
|
||||
para_cnt = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
|
@ -380,9 +421,36 @@ Server::loadDatabase(std::string _db_name, std::string _ac_name, std::string& _r
|
|||
_ret_msg = "load database failed.";
|
||||
delete this->database;
|
||||
this->database = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
return flag;
|
||||
pid_t fpid = vfork();
|
||||
|
||||
// child, scheduler
|
||||
if (fpid == 0) {
|
||||
time_t cur_time = time(NULL);
|
||||
long time_backup = Util::read_backup_time();
|
||||
long first_backup = cur_time - (cur_time - time_backup) % Util::gserver_backup_interval
|
||||
+ Util::gserver_backup_interval;
|
||||
this->next_backup = first_backup;
|
||||
string s_port = Util::int2string(this->connectionPort);
|
||||
string s_next_backup = Util::int2string(first_backup);
|
||||
execl("bin/gserver_backup_scheduler", "gserver_backup_scheduler", s_port.c_str(), s_next_backup.c_str(), NULL);
|
||||
exit(0);
|
||||
return true;
|
||||
}
|
||||
// parent
|
||||
if (fpid > 0) {
|
||||
this->scheduler_pid = fpid;
|
||||
}
|
||||
// fork failure
|
||||
else if (fpid < 0) {
|
||||
cerr << Util::getTimeString() << "Database will not be backed-up automatically." << endl;
|
||||
}
|
||||
|
||||
//_ret_msg = "load database done.";
|
||||
return true;
|
||||
//return flag;
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -398,6 +466,13 @@ Server::unloadDatabase(std::string _db_name, std::string _ac_name, std::string&
|
|||
this->database = NULL;
|
||||
_ret_msg = "unload database done.";
|
||||
|
||||
this->next_backup = 0;
|
||||
//string cmd = "kill " + Util::int2string(this->scheduler_pid);
|
||||
//system(cmd.c_str());
|
||||
kill(this->scheduler_pid, SIGTERM);
|
||||
waitpid(this->scheduler_pid, NULL, 0);
|
||||
this->scheduler_pid = 0;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -458,8 +533,9 @@ Server::importRDF(std::string _db_name, std::string _ac_name, std::string _rdf_p
|
|||
bool
|
||||
Server::query(const string _query, string& _ret_msg)
|
||||
{
|
||||
cout<<"Server query()"<<endl;
|
||||
cout<<_query<<endl;
|
||||
//cout<<"Server query()"<<endl;
|
||||
//cout<<_query<<endl;
|
||||
cout << Util::getTimeString() << "Server query(): " << _query << endl;
|
||||
|
||||
if (this->database == NULL)
|
||||
{
|
||||
|
@ -468,8 +544,8 @@ Server::query(const string _query, string& _ret_msg)
|
|||
}
|
||||
|
||||
FILE* output = NULL;
|
||||
string path = "logs/gserver_query.log";
|
||||
#ifdef OUTPUT_QUERY_RESULT
|
||||
string path = "logs/gserver_query.log";
|
||||
output = fopen(path.c_str(), "w");
|
||||
#endif
|
||||
|
||||
|
@ -537,7 +613,7 @@ Server::showDatabases(string _para, string _ac_name, string& _ret_msg)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Server::stopServer(std::string& _ret_msg) {
|
||||
bool Server::stopServer(string& _ret_msg) {
|
||||
if (this->database != NULL) {
|
||||
delete this->database;
|
||||
this->database = NULL;
|
||||
|
@ -546,3 +622,40 @@ bool Server::stopServer(std::string& _ret_msg) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Server::backup(string& _ret_msg) {
|
||||
this->next_backup += Util::gserver_backup_interval;
|
||||
if (this->database == NULL) {
|
||||
_ret_msg = "No database in use.";
|
||||
return false;
|
||||
}
|
||||
if (!this->database->backup()) {
|
||||
_ret_msg = "Backup failed.";
|
||||
return false;
|
||||
}
|
||||
_ret_msg = "done";
|
||||
return true;
|
||||
}
|
||||
|
||||
pthread_t Server::start_timer() {
|
||||
pthread_t timer_thread;
|
||||
if (pthread_create(&timer_thread, NULL, Server::timer, NULL) == 0) {
|
||||
return timer_thread;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool Server::stop_timer(pthread_t _timer) {
|
||||
return pthread_kill(_timer, SIGTERM) == 0;
|
||||
}
|
||||
|
||||
void* Server::timer(void* _args) {
|
||||
signal(SIGTERM, Server::timer_sigterm_handler);
|
||||
sleep(Util::gserver_query_timeout);
|
||||
cerr << Util::getTimeString() << "Query out of time." << endl;
|
||||
abort();
|
||||
}
|
||||
|
||||
void Server::timer_sigterm_handler(int _signal_num) {
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
||||
|
|
|
@ -32,8 +32,8 @@
|
|||
class Server
|
||||
{
|
||||
public:
|
||||
Server();
|
||||
Server(unsigned short _port);
|
||||
//Server();
|
||||
Server(unsigned short _port = Socket::DEFAULT_CONNECT_PORT);
|
||||
~Server();
|
||||
|
||||
bool createConnection();
|
||||
|
@ -51,6 +51,7 @@ public:
|
|||
//bool insertTriple(std::string _db_name, std::string _ac_name, std::string _rdf_path, std::string& _ret_msg);
|
||||
bool query(const std::string _query, std::string& _ret_msg);
|
||||
bool stopServer(std::string& _ret_msg);
|
||||
bool backup(std::string& _ret_msg);
|
||||
|
||||
private:
|
||||
unsigned short connectionPort;
|
||||
|
@ -61,6 +62,13 @@ private:
|
|||
std::string db_home;
|
||||
std::string db_suffix;
|
||||
|
||||
static pthread_t start_timer();
|
||||
static bool stop_timer(pthread_t _timer);
|
||||
static void* timer(void* _args);
|
||||
static void timer_sigterm_handler(int _signal_num);
|
||||
|
||||
long next_backup;
|
||||
pid_t scheduler_pid;
|
||||
};
|
||||
|
||||
#endif // _SERVER_SERVER_H
|
||||
|
|
|
@ -65,6 +65,12 @@ FILE* Util::debug_kvstore = NULL; //used by KVstore
|
|||
FILE* Util::debug_database = NULL; //used by Database
|
||||
FILE* Util::debug_vstree = NULL; //used by VSTree
|
||||
|
||||
string Util::gserver_port_file = "bin/.gserver_port";
|
||||
string Util::gserver_port_swap = "bin/.gserver_port.swap";
|
||||
string Util::gserver_log = "logs/gserver.log";
|
||||
|
||||
string Util::backup_path = "backups/";
|
||||
|
||||
//set hash table
|
||||
HashFunction Util::hash[] = { Util::simpleHash, Util::APHash, Util::BKDRHash, Util::DJBHash, Util::ELFHash, \
|
||||
Util::DEKHash, Util::BPHash, Util::FNVHash, Util::HFLPHash, Util::HFHash, Util::JSHash, \
|
||||
|
@ -655,6 +661,14 @@ Util::create_dir(const string _dir)
|
|||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
Util::create_file(const string _file) {
|
||||
if (creat(_file.c_str(), 0755) > 0) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
long
|
||||
Util::get_cur_time()
|
||||
{
|
||||
|
@ -1508,3 +1522,38 @@ Util::_pso_cmp(const void* _a, const void* _b)
|
|||
return 0;
|
||||
}
|
||||
|
||||
long Util::read_backup_time() {
|
||||
ifstream in;
|
||||
in.open(Util::profile.c_str(), ios::in);
|
||||
if (!in) {
|
||||
return Util::gserver_backup_time;
|
||||
}
|
||||
int buf_size = 512;
|
||||
char lbuf[buf_size];
|
||||
while (!in.eof()) {
|
||||
in.getline(lbuf, buf_size);
|
||||
regex_t reg;
|
||||
char pattern[] = "^\\s*BackupTime\\s*=\\s*((0|1)[0-9]|2[0-3])[0-5][0-9]\\s*(\\s#.*)?$";
|
||||
regcomp(®, pattern, REG_EXTENDED | REG_NOSUB);
|
||||
regmatch_t pm[1];
|
||||
int status = regexec(®, lbuf, 1, pm, 0);
|
||||
regfree(®);
|
||||
if (status == REG_NOMATCH) {
|
||||
continue;
|
||||
}
|
||||
else if (status != 0) {
|
||||
in.close();
|
||||
return Util::gserver_backup_time;
|
||||
}
|
||||
for (int i = 11; i < buf_size && lbuf[i]; i++) {
|
||||
if (lbuf[i] >= '0' && lbuf[i] <= '9') {
|
||||
in.close();
|
||||
return 36000 * (lbuf[i] - '0') + 3600 * (lbuf[i + 1] - '0')
|
||||
+ 600 * (lbuf[i + 2] - '0') + 60 * (lbuf[i + 3] - '0');
|
||||
}
|
||||
}
|
||||
}
|
||||
in.close();
|
||||
return Util::gserver_backup_time;
|
||||
}
|
||||
|
||||
|
|
14
Util/Util.h
14
Util/Util.h
|
@ -31,12 +31,14 @@ in the sparql query can point to the same node in data graph)
|
|||
#include <locale.h>
|
||||
#include <assert.h>
|
||||
#include <libgen.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/file.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/wait.h>
|
||||
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
|
@ -194,6 +196,15 @@ public:
|
|||
static const int II_TREE = 2;
|
||||
static const int IS_TREE = 3;
|
||||
|
||||
static std::string gserver_port_file;
|
||||
static std::string gserver_port_swap;
|
||||
static std::string gserver_log;
|
||||
static const int gserver_query_timeout = 10000; // Timeout of gServer's query (in seconds)
|
||||
|
||||
static std::string backup_path;
|
||||
static const long gserver_backup_interval = 86400;
|
||||
static const long gserver_backup_time = 72000; // Default backup time (UTC)
|
||||
|
||||
static int memUsedPercentage();
|
||||
static int memoryLeft();
|
||||
static int compare(const char* _str1, unsigned _len1, const char* _str2, unsigned _len2); //QUERY(how to use default args)
|
||||
|
@ -211,12 +222,15 @@ public:
|
|||
static std::string result_id_str(std::vector<int*>& _v, int _var_num);
|
||||
static bool dir_exist(const std::string _dir);
|
||||
static bool create_dir(const std:: string _dir);
|
||||
static bool create_file(const std::string _file);
|
||||
|
||||
static long get_cur_time();
|
||||
static bool save_to_file(const char*, const std::string _content);
|
||||
static bool isValidPort(std::string);
|
||||
static bool isValidIP(std::string);
|
||||
static std::string getTimeString();
|
||||
static std::string node2string(const char* _raw_str);
|
||||
static long read_backup_time();
|
||||
|
||||
static bool is_literal_ele(int);
|
||||
static int removeDuplicate(int*, int);
|
||||
|
|
|
@ -39,3 +39,6 @@ buffer_maxium = 100
|
|||
# If it is closed(that is, the option is uncommented and set to false), then gStore will run fatser but maybe not safe and recoverable
|
||||
# operation_logs = true
|
||||
|
||||
# Time of scheduled backup of gserver (HHMM, UTC)
|
||||
BackupTime = 2000 # 4 am (GMT+8)
|
||||
|
||||
|
|
9
makefile
9
makefile
|
@ -103,7 +103,7 @@ inc = -I./tools/libantlr3c-3.4/ -I./tools/libantlr3c-3.4/include
|
|||
# http://blog.csdn.net/jeffrey0000/article/details/12421317
|
||||
|
||||
#gtest
|
||||
TARGET = $(exedir)gbuild $(exedir)gserver $(exedir)gclient $(exedir)gquery $(exedir)gconsole $(api_java) $(exedir)gadd $(exedir)gsub
|
||||
TARGET = $(exedir)gbuild $(exedir)gserver $(exedir)gserver_backup_scheduler $(exedir)gclient $(exedir)gquery $(exedir)gconsole $(api_java) $(exedir)gadd $(exedir)gsub
|
||||
|
||||
all: $(TARGET)
|
||||
|
||||
|
@ -125,6 +125,9 @@ $(exedir)gquery: $(lib_antlr) $(objdir)gquery.o $(objfile)
|
|||
$(exedir)gserver: $(lib_antlr) $(objdir)gserver.o $(objfile)
|
||||
$(CC) $(EXEFLAG) -o $(exedir)gserver $(objdir)gserver.o $(objfile) $(library)
|
||||
|
||||
$(exedir)gserver_backup_scheduler: $(lib_antlr) $(objdir)gserver_backup_scheduler.o $(objfile)
|
||||
$(CC) $(EXEFLAG) -o $(exedir)gserver_backup_scheduler $(objdir)gserver_backup_scheduler.o $(objfile) $(library)
|
||||
|
||||
$(exedir)gclient: $(lib_antlr) $(objdir)gclient.o $(objfile)
|
||||
$(CC) $(EXEFLAG) -o $(exedir)gclient $(objdir)gclient.o $(objfile) $(library)
|
||||
|
||||
|
@ -146,6 +149,9 @@ $(objdir)gquery.o: Main/gquery.cpp Database/Database.h Util/Util.h $(lib_antlr)
|
|||
$(objdir)gserver.o: Main/gserver.cpp Server/Server.h Util/Util.h $(lib_antlr)
|
||||
$(CC) $(CFLAGS) Main/gserver.cpp $(inc) -o $(objdir)gserver.o
|
||||
|
||||
$(objdir)gserver_backup_scheduler.o: Main/gserver_backup_scheduler.cpp Server/Server.h Util/Util.h $(lib_antlr)
|
||||
$(CC) $(CFLAGS) Main/gserver_backup_scheduler.cpp $(inc) -o $(objdir)gserver_backup_scheduler.o
|
||||
|
||||
$(objdir)gclient.o: Main/gclient.cpp Server/Client.h Util/Util.h $(lib_antlr)
|
||||
$(CC) $(CFLAGS) Main/gclient.cpp $(inc) -o $(objdir)gclient.o #-DREADLINE_ON
|
||||
|
||||
|
@ -454,5 +460,6 @@ fulltest:
|
|||
|
||||
#test the efficience of kvstore, insert/delete/search, use dbpedia170M by default
|
||||
test-kvstore:
|
||||
# test/kvstore_test.cpp
|
||||
echo "TODO"
|
||||
|
||||
|
|
Loading…
Reference in New Issue