feat: merge wlb branch;

support backup in gserver but not in HttpConnector;

by zengli
This commit is contained in:
bookug 2017-05-18 23:27:58 +08:00
commit c11088ee3e
14 changed files with 1647 additions and 344 deletions

View File

@ -19,6 +19,8 @@ Database::Database()
this->six_tuples_file = "six_tuples";
this->db_info_file = "db_info_file.dat";
this->id_tuples_file = "id_tuples";
this->update_log = "update.log";
this->update_log_since_backup = "update_since_backup.log";
string kv_store_path = store_path + "/kv_store";
this->kvstore = new KVstore(kv_store_path);
@ -54,12 +56,19 @@ Database::Database()
Database::Database(string _name)
{
this->name = _name;
size_t found = this->name.find_last_not_of('/');
if (found != string::npos)
{
this->name.erase(found + 1);
}
this->store_path = Util::global_config["db_home"] + "/" + this->name + Util::global_config["db_suffix"];
this->signature_binary_file = "signature.binary";
this->six_tuples_file = "six_tuples";
this->db_info_file = "db_info_file.dat";
this->id_tuples_file = "id_tuples";
this->update_log = "update.log";
this->update_log_since_backup = "update_since_backup.log";
string kv_store_path = store_path + "/kv_store";
this->kvstore = new KVstore(kv_store_path);
@ -600,19 +609,30 @@ Database::load()
return true;
}
//NOTICE: we ensure that if the unload() exists normally, then all updates have already been written to disk
//So when accidents happens, we only have to restore the databases that are in load status(inlucidng that unload
//not finished) later.
bool
Database::unload()
{
//TODO: do we need to update the pre2num if update queries exist??
//or we just neglect this, that is ok because pre2num is just used to count
delete[] this->pre2num;
this->pre2num = NULL;
delete this->entity_buffer;
delete this->literal_buffer;
delete this->entity_buffer;
this->entity_buffer = NULL;
delete this->literal_buffer;
this->literal_buffer = NULL;
//TODO: fflush the database file
this->vstree->saveTree();
delete this->vstree;
this->vstree = NULL;
delete this->kvstore;
this->kvstore = NULL;
delete this->stringindex;
this->stringindex = NULL;
@ -621,10 +641,28 @@ Database::unload()
this->initIDinfo();
this->if_loaded = false;
this->clear_update_log();
return true;
}
void Database::clear()
{
delete[] this->pre2num;
this->pre2num = NULL;
delete this->entity_buffer;
this->entity_buffer = NULL;
delete this->literal_buffer;
this->literal_buffer = NULL;
delete this->vstree;
this->vstree = NULL;
delete this->kvstore;
this->kvstore = NULL;
delete this->stringindex;
this->stringindex = NULL;
}
string
Database::getName()
{
@ -777,6 +815,11 @@ Database::build(const string& _rdf_file)
string stringindex_store_path = store_path + "/stringindex_store";
Util::create_dir(stringindex_store_path);
string update_log_path = this->store_path + '/' + this->update_log;
Util::create_file(update_log_path);
string update_log_since_backup = this->store_path + '/' + this->update_log_since_backup;
Util::create_file(update_log_since_backup);
cout << "begin encode RDF from : " << ret << " ..." << endl;
//BETTER+TODO:now require that dataset size < memory
@ -1066,6 +1109,31 @@ Database::exist_triple(TYPE_ENTITY_LITERAL_ID _sub_id, TYPE_PREDICATE_ID _pre_id
return is_exist;
}
bool Database::exist_triple(const TripleWithObjType& _triple) {
int sub_id = this->kvstore->getIDByEntity(_triple.getSubject());
if (sub_id == -1) {
return false;
}
int pre_id = this->kvstore->getIDByPredicate(_triple.getPredicate());
if (pre_id == -1) {
return false;
}
int obj_id = -1;
if (_triple.isObjEntity()) {
obj_id = this->kvstore->getIDByEntity(_triple.getObject());
}
else if (_triple.isObjLiteral()) {
obj_id = this->kvstore->getIDByLiteral(_triple.getObject());
}
if (obj_id == -1) {
return false;
}
return exist_triple(sub_id, pre_id, obj_id);
}
//NOTICE: all constants are transfered to ids in memory
//this maybe not ok when size is too large!
bool
@ -2086,10 +2154,10 @@ Database::removeTriple(const TripleWithObjType& _triple, vector<unsigned>* _vert
}
bool
Database::insert(std::string _rdf_file)
Database::insert(std::string _rdf_file, bool _is_restore)
{
//cout<<"to load in insert"<<endl;
bool flag = this->load();
bool flag = _is_restore || this->load();
//bool flag = this->load();
if (!flag)
{
return false;
@ -2137,7 +2205,7 @@ Database::insert(std::string _rdf_file)
}
//Process the Triple one by one
success_num += this->insert(triple_array, parse_triple_num);
success_num += this->insert(triple_array, parse_triple_num, _is_restore);
//some maybe invalid or duplicate
//triple_num += parse_triple_num;
}
@ -2183,9 +2251,10 @@ Database::insert(std::string _rdf_file)
}
bool
Database::remove(std::string _rdf_file)
Database::remove(std::string _rdf_file, bool _is_restore)
{
bool flag = this->load();
bool flag = _is_restore || this->load();
//bool flag = this->load();
if (!flag)
{
return false;
@ -2227,7 +2296,7 @@ Database::remove(std::string _rdf_file)
break;
}
success_num += this->remove(triple_array, parse_triple_num);
success_num += this->remove(triple_array, parse_triple_num, _is_restore);
//some maybe invalid or duplicate
//triple_num -= parse_triple_num;
}
@ -2262,11 +2331,37 @@ Database::remove(std::string _rdf_file)
}
unsigned
Database::insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num)
Database::insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num, bool _is_restore)
{
vector<TYPE_ENTITY_LITERAL_ID> vertices, predicates;
TYPE_TRIPLE_NUM valid_num = 0;
if (!_is_restore) {
string path = this->getStorePath() + '/' + this->update_log;
string path_all = this->getStorePath() + '/' + this->update_log_since_backup;
ofstream out;
ofstream out_all;
out.open(path.c_str(), ios::out | ios::app);
out_all.open(path_all.c_str(), ios::out | ios::app);
if (!out || !out_all) {
cerr << "Failed to open update log. Insertion aborted." << endl;
return 0;
}
for (int i = 0; i < _triple_num; i++) {
if (exist_triple(_triples[i])) {
continue;
}
stringstream ss;
ss << "I\t" << Util::node2string(_triples[i].getSubject().c_str()) << '\t';
ss << Util::node2string(_triples[i].getPredicate().c_str()) << '\t';
ss << Util::node2string(_triples[i].getObject().c_str()) << '.' << endl;
out << ss.str();
out_all << ss.str();
}
out.close();
out_all.close();
}
#ifdef USE_GROUP_INSERT
//NOTICE:this is called by insert(file) or query()(but can not be too large),
//assume that db is loaded already
@ -2717,11 +2812,37 @@ Database::insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num)
}
unsigned
Database::remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num)
Database::remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num, bool _is_restore)
{
vector<TYPE_ENTITY_LITERAL_ID> vertices, predicates;
TYPE_TRIPLE_NUM valid_num = 0;
if (!_is_restore) {
string path = this->getStorePath() + '/' + this->update_log;
string path_all = this->getStorePath() + '/' + this->update_log_since_backup;
ofstream out;
ofstream out_all;
out.open(path.c_str(), ios::out | ios::app);
out_all.open(path_all.c_str(), ios::out | ios::app);
if (!out || !out_all) {
cerr << "Failed to open update log. Removal aborted." << endl;
return 0;
}
for (int i = 0; i < _triple_num; i++) {
if (!exist_triple(_triples[i])) {
continue;
}
stringstream ss;
ss << "R\t" << Util::node2string(_triples[i].getSubject().c_str()) << '\t';
ss << Util::node2string(_triples[i].getPredicate().c_str()) << '\t';
ss << Util::node2string(_triples[i].getObject().c_str()) << '.' << endl;
out << ss.str();
out_all << ss.str();
}
out.close();
out_all.close();
}
#ifdef USE_GROUP_DELETE
//NOTICE:this is called by remove(file) or query()(but can not be too large),
//assume that db is loaded already
@ -3073,6 +3194,214 @@ Database::remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num)
return valid_num;
}
bool
Database::backup()
{
if (!Util::dir_exist(Util::backup_path))
{
Util::create_dir(Util::backup_path);
}
string backup_path = Util::backup_path + this->store_path;
cout << "Beginning backup." << endl;
string sys_cmd;
if (Util::dir_exist(backup_path))
{
sys_cmd = "rm -rf " + backup_path;
system(sys_cmd.c_str());
}
sys_cmd = "cp -r " + this->store_path + ' ' + backup_path;
system(sys_cmd.c_str());
sys_cmd = "rm " + backup_path + '/' + this->update_log;
system(sys_cmd.c_str());
this->vstree->saveTree();
this->kvstore->flush();
this->clear_update_log();
string update_log_path = this->store_path + '/' + this->update_log_since_backup;
sys_cmd = "rm " + update_log_path;
system(sys_cmd.c_str());
Util::create_file(update_log_path);
cout << "Backup completed!" << endl;
return true;
}
bool
Database::restore()
{
cout << "Begining restore." << endl;
string sys_cmd;
multiset<string> insertions;
multiset<string> removals;
int num_update = 0;
if (!this->load())
{
this->clear();
string backup_path = Util::backup_path + this->store_path;
if (!Util::dir_exist(Util::backup_path))
{
cerr << "Failed to restore!" << endl;
return false;
}
num_update += Database::read_update_log(this->store_path + '/' + this->update_log_since_backup, insertions, removals);
cout << "Failed to restore from original db file, trying to restore from backup file." << endl;
cout << "Your old db file will be stored at " << this->store_path << ".bad" << endl;
sys_cmd = "rm -rf " + this->store_path + ".bad";
system(sys_cmd.c_str());
sys_cmd = "cp -r " + this->store_path + ' ' + this->store_path + ".bad";
system(sys_cmd.c_str());
sys_cmd = "rm -rf " + this->store_path;
system(sys_cmd.c_str());
sys_cmd = "cp -r " + backup_path + ' ' + this->store_path;
system(sys_cmd.c_str());
Util::create_file(this->store_path + '/' + this->update_log);
if (!this->load())
{
this->clear();
cerr << "Failed to restore from backup file." << endl;
return false;
}
num_update += Database::read_update_log(this->store_path + '/' + this->update_log_since_backup, insertions, removals);
}
else
{
num_update += Database::read_update_log(this->store_path + '/' + this->update_log, insertions, removals);
}
cout << "Restoring " << num_update << " updates." << endl;
if (!this->restore_update(insertions, removals))
{
cerr << "Failed to restore updates" << endl;
return false;
}
cout << "Restore completed." << endl;
return true;
}
int
Database::read_update_log(const string _path, multiset<string>& _i, multiset<string>& _r)
{
ifstream in;
#ifdef DEBUG
cout<<_path<<endl;
#endif
in.open(_path.c_str(), ios::in);
if (!in) {
cerr << "Failed to read update log." << endl;
return 0;
}
int ret = 0;
int buffer_size = 1024 + 2;
char buffer[buffer_size];
in.getline(buffer, buffer_size);
while (!in.eof() && buffer[0]) {
string triple;
switch (buffer[0]) {
case 'I':
triple = string(buffer + 2);
ret++;
_i.insert(triple);
break;
case 'R':
triple = string(buffer + 2);
ret++;
_r.insert(triple);
break;
default:
cerr << "Bad line in update log!" << endl;
}
in.getline(buffer, buffer_size);
}
return ret;
}
bool
Database::restore_update(multiset<string>& _i, multiset<string>& _r)
{
multiset<string>::iterator pos;
multiset<string>::iterator it_to_erase = _r.end();
for (multiset<string>::iterator it = _r.begin(); it != _r.end(); it++)
{
//NOTICE: check the intersect of insert_set and remove_set
if (it_to_erase != _r.end()) {
_r.erase(it_to_erase);
}
pos = _i.find(*it);
if (pos != _i.end()) {
_i.erase(pos);
it_to_erase = it;
}
else {
it_to_erase = _r.end();
}
}
if (it_to_erase != _r.end()) {
_r.erase(it_to_erase);
}
string tmp_path = this->store_path + "/.update_tmp";
ofstream out_i;
out_i.open(tmp_path.c_str(), ios::out);
if (!out_i) {
cerr << "Failed to open temp file, restore failed!" << endl;
return false;
}
for (multiset<string>::iterator it = _i.begin(); it != _i.end(); it++) {
out_i << *it << endl;
}
out_i.close();
if (!this->insert(tmp_path, true))
//if (!this->remove(tmp_path, true))
{
return false;
}
ofstream out_r;
out_r.open(tmp_path.c_str(), ios::out);
if (!out_r) {
cerr << "Failed to open temp file!" << endl;
return false;
}
for (multiset<string>::iterator it = _r.begin(); it != _r.end(); it++) {
out_r << *it << endl;
}
out_r.close();
if (!this->remove(tmp_path, true))
//if (!this->insert(tmp_path, true))
{
return false;
}
string cmd = "rm " + tmp_path;
system(cmd.c_str());
return true;
}
void
Database::clear_update_log()
{
string cmd = "rm " + this->store_path + '/' + this->update_log;
system(cmd.c_str());
Util::create_file(this->store_path + '/' + this->update_log);
}
bool
Database::objIDIsEntityID(TYPE_ENTITY_LITERAL_ID _id)
{
@ -3114,6 +3443,7 @@ Database::getFinalResult(SPARQLquery& _sparql_q, ResultSet& _result_set)
vector<unsigned> keys;
vector<bool> desc;
_result_set.openStream(keys, desc);
//_result_set.openStream(keys, desc, 0, -1);
#ifdef DEBUG_PRECISE
printf("getFinalResult:after open stream\n");
#endif

View File

@ -48,6 +48,7 @@ public:
bool load();
bool unload();
void clear();
int query(const string _query, ResultSet& _result_set, FILE* _fp = stdout);
//1. if subject of _triple doesn't exist,
@ -57,8 +58,11 @@ public:
bool build(const string& _rdf_file);
//interfaces to insert/delete from given rdf file
bool insert(std::string _rdf_file);
bool remove(std::string _rdf_file);
bool insert(std::string _rdf_file, bool _is_restore = false);
bool remove(std::string _rdf_file, bool _is_restore = false);
bool backup();
bool restore();
//name of this DB
string getName();
@ -105,6 +109,8 @@ private:
//id tuples file
string id_tuples_file;
string update_log;
string update_log_since_backup;
//pre2num mapping
TYPE_TRIPLE_NUM* pre2num;
@ -176,6 +182,7 @@ private:
//check whether the relative 3-tuples exist
//usually, through sp2olist
bool exist_triple(TYPE_ENTITY_LITERAL_ID _sub_id, TYPE_PREDICATE_ID _pre_id, TYPE_ENTITY_LITERAL_ID _obj_id);
bool exist_triple(const TripleWithObjType& _triple);
//* _rdf_file denotes the path of the RDF file, where stores the rdf data
//* there are many step in this function, each one responds to an sub-function
@ -197,10 +204,9 @@ private:
bool insertTriple(const TripleWithObjType& _triple, vector<unsigned>* _vertices = NULL, vector<unsigned>* _predicates = NULL);
bool removeTriple(const TripleWithObjType& _triple, vector<unsigned>* _vertices = NULL, vector<unsigned>* _predicates = NULL);
//NOTICE:one by one is too costly, sort and insert/delete at a time will be better
unsigned insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num);
unsigned insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num, bool _is_restore=false);
//bool insert(const vector<TripleWithObjType>& _triples, vector<int>& _vertices, vector<int>& _predicates);
unsigned remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num);
//bool remove(const vector<TripleWithObjType>& _triples, vector<int>& _vertices, vector<int>& _predicates);
unsigned remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num, bool _is_restore=false);
bool sub2id_pre2id_obj2id_RDFintoSignature(const string _rdf_file);
//bool literal2id_RDFintoSignature(const string _rdf_file, int** _p_id_tuples, TYPE_TRIPLE_NUM _id_tuples_max);
@ -216,6 +222,10 @@ private:
//get the final string result_set from SPARQLquery
bool getFinalResult(SPARQLquery& _sparql_q, ResultSet& _result_set);
static int read_update_log(const string _path, multiset<string>& _i, multiset<string>& _r);
bool restore_update(multiset<string>& _i, multiset<string>& _r);
void clear_update_log();
};
#endif //_DATABASE_DATABASE_H

View File

@ -44,6 +44,9 @@ int query_handler(const vector<string>&);
int show_handler(const vector<string>&);
int add_handler(const vector<string>&);
int sub_handler(const vector<string>&);
//backup commands
int backup_handler(const vector<string>&);
int restore_handler(const vector<string>&);
//A structure which contains information on the commands this program can understand.
typedef struct {
@ -71,6 +74,8 @@ COMMAND native_commands[] = {
{ "restart", restart_handler, "\tRestart local server." },
{ "port", port_handler, "\tChange port of local server." },
{ "printport", printport_handler, "Print local server's port configuration."},
{ "backup", backup_handler, "\tBackup current database." },
{ "restore", restore_handler, "\tRestore a database backup." },
{ NULL, NULL, NULL }
};
@ -914,7 +919,7 @@ int drop_handler(const vector<string>& args) {
cerr << "You should use exactly the same db name as building, which should not end with \".db\"" << endl;
return -1;
}
database += ".db";
//database += ".db";
//remote mode
if (gc != NULL) {
@ -1112,7 +1117,7 @@ int add_handler(const vector<string>& args) {
cerr << "You should use exactly the same db name as building, which should not end with \".db\"" << endl;
return -1;
}
database += ".db";
//database += ".db";
Database _db(database);
if (!_db.insert(args[1])) {
cerr << "Failed to insert!" << endl;
@ -1135,8 +1140,9 @@ int sub_handler(const vector<string>& args) {
cerr << "You should use exactly the same db name as building, which should not end with \".db\"" << endl;
return -1;
}
database += ".db";
Database _db(database); if (!_db.remove(args[1])) {
//database += ".db";
Database _db(database);
if (!_db.remove(args[1])) {
cerr << "Failed to remove!" << endl;
return -1;
}
@ -1191,6 +1197,41 @@ int printport_handler(const vector<string>& args) {
return system("bin/gserver -P");
}
int backup_handler(const vector<string>& args) {
if (!args.empty()) {
cerr << "Too many arguments!" << endl;
return -1;
}
if (current_database == NULL) {
cerr << "No database loaded!" << endl;
return -1;
}
current_database->backup();
return 0;
}
int restore_handler(const vector<string>& args) {
if (args.size() != 1) {
cerr << "Exactly 1 argument required!" << endl;
return -1;
}
if (current_database != NULL) {
cerr << "Please unload your database first!" << endl;
return -1;
}
string database = args[0];
if (database.length() > 3 && database.substr(database.length() - 3, 3) == ".db") {
cerr << "You should use exactly the same db name as building, which should not end with \".db\"" << endl;
return -1;
}
//database += ".db";
Database db(database);
if (!db.restore()) {
return -1;
}
return 0;
}

View File

@ -17,7 +17,7 @@ using namespace std;
bool isOnlyProcess(const char* argv0);
void checkSwap();
bool startServer();
bool startServer(bool _debug);
bool stopServer();
int main(int argc, char* argv[])
@ -39,7 +39,7 @@ int main(int argc, char* argv[])
return -1;
}
if (mode == "-h" || mode == "--help") {
else if (mode == "-h" || mode == "--help") {
cout << endl;
cout << "gStore Server (gServer)" << endl;
cout << endl;
@ -52,6 +52,7 @@ int main(int argc, char* argv[])
cout << "\t-r,--restart\t\tRestart gServer." << endl;
cout << "\t-p,--port [PORT=" << Socket::DEFAULT_CONNECT_PORT << "]\tChange connection port configuration, takes effect after restart if gServer running." << endl;
cout << "\t-P,--printport\t\tDisplay current connection port configuration." << endl;
cout << "\t-d,--debug\t\tStart gServer in debug mode (keep gServer in the foreground)." << endl;
cout << "\t-k,--kill\t\tKill existing gServer process(es), ONLY use when out of normal procedures." << endl;
cout << endl;
return 0;
@ -70,6 +71,7 @@ int main(int argc, char* argv[])
}
}
if (!isOnlyProcess(argv[0])) {
//ofstream out(GSERVER_PORT_SWAP, ios::out);
ofstream out(Util::gserver_port_swap.c_str());
if (!out) {
cerr << "Failed to change port!" << endl;
@ -80,6 +82,7 @@ int main(int argc, char* argv[])
cout << "Port will be changed to " << port << " after the current server stops or restarts." << endl;
return 0;
}
//ofstream out(GSERVER_PORT_FILE, ios::out);
ofstream out(Util::gserver_port_file.c_str());
if (!out) {
cerr << "Failed to change port!" << endl;
@ -91,12 +94,12 @@ int main(int argc, char* argv[])
return 0;
}
if (mode == "-s" || mode == "--start") {
else if (mode == "-s" || mode == "--start") {
if (!isOnlyProcess(argv[0])) {
cerr << "gServer already running!" << endl;
return -1;
}
if (startServer()) {
if (startServer(false)) {
sleep(1);
if (isOnlyProcess(argv[0])) {
cerr << "Server stopped unexpectedly. Check for port conflicts!" << endl;
@ -109,7 +112,7 @@ int main(int argc, char* argv[])
}
}
if (mode == "-t" || mode == "--stop") {
else if (mode == "-t" || mode == "--stop") {
if (isOnlyProcess(argv[0])) {
cerr << "gServer not running!" << endl;
return -1;
@ -122,7 +125,7 @@ int main(int argc, char* argv[])
}
}
if (mode == "-r" || mode == "--restart") {
else if (mode == "-r" || mode == "--restart") {
if (isOnlyProcess(argv[0])) {
cerr << "gServer not running!" << endl;
return -1;
@ -130,14 +133,23 @@ int main(int argc, char* argv[])
if (!stopServer()) {
return -1;
}
if (!startServer()) {
if (startServer(false)) {
sleep(1);
if (isOnlyProcess(argv[0])) {
cerr << "Server stopped unexpectedly. Check for port conflicts!" << endl;
return -1;
}
return 0;
}
else {
return -1;
}
return 0;
}
if (mode == "-P" || mode == "--printport") {
else if (mode == "-P" || mode == "--printport") {
unsigned short port = Socket::DEFAULT_CONNECT_PORT;
//ifstream in(GSERVER_PORT_FILE);
ifstream in(Util::gserver_port_file.c_str());
if (in) {
in >> port;
@ -145,6 +157,7 @@ int main(int argc, char* argv[])
}
cout << "Current connection port is " << port << '.' << endl;
unsigned short portSwap = 0;
//ifstream inSwap(GSERVER_PORT_SWAP);
ifstream inSwap(Util::gserver_port_swap.c_str());
if (inSwap) {
inSwap >> portSwap;
@ -156,7 +169,7 @@ int main(int argc, char* argv[])
return 0;
}
if (mode == "-k" || mode == "--kill") {
else if (mode == "-k" || mode == "--kill") {
if (isOnlyProcess(argv[0])) {
cerr << "No process to kill!" << endl;
return -1;
@ -165,8 +178,10 @@ int main(int argc, char* argv[])
return 0;
}
cerr << "Invalid arguments! Type \"bin/gserver -h\" for help." << endl;
return -1;
else {
cerr << "Invalid arguments! Type \"bin/gserver -h\" for help." << endl;
return -1;
}
}
bool isOnlyProcess(const char* argv0) {
@ -174,9 +189,11 @@ bool isOnlyProcess(const char* argv0) {
}
void checkSwap() {
//if (access(GSERVER_PORT_SWAP, 00) != 0) {
if (access(Util::gserver_port_swap.c_str(), 00) != 0) {
return;
}
//ifstream in(GSERVER_PORT_SWAP, ios::in);
ifstream in(Util::gserver_port_swap.c_str());
if (!in) {
cerr << "Failed in checkSwap(), port may not be changed." << endl;
@ -185,6 +202,7 @@ void checkSwap() {
unsigned short port;
in >> port;
in.close();
//ofstream out(GSERVER_PORT_FILE, ios::out);
ofstream out(Util::gserver_port_file.c_str());
if (!out) {
cerr << "Failed in checkSwap(), port may not be changed." << endl;
@ -192,19 +210,24 @@ void checkSwap() {
}
out << port;
out.close();
//chmod(GSERVER_PORT_FILE, 0644);
chmod(Util::gserver_port_file.c_str(), 0644);
//string cmd = string("rm ") + GSERVER_PORT_SWAP;
string cmd = string("rm ") + Util::gserver_port_swap;
system(cmd.c_str());
}
bool startServer() {
bool startServer(bool _debug) {
unsigned short port = Socket::DEFAULT_CONNECT_PORT;
//ifstream in(GSERVER_PORT_FILE, ios::in);
ifstream in(Util::gserver_port_file.c_str());
if (!in) {
//ofstream out(GSERVER_PORT_FILE, ios::out);
ofstream out(Util::gserver_port_file.c_str());
if (out) {
out << port;
out.close();
//chmod(GSERVER_PORT_FILE, 0644);
chmod(Util::gserver_port_file.c_str(), 0644);
}
}
@ -213,6 +236,17 @@ bool startServer() {
in.close();
}
if (_debug) {
Server server(port);
if (!server.createConnection()) {
cerr << Util::getTimeString() << "Failed to create connection at port " << port << '.' << endl;
return false;
}
cout << Util::getTimeString() << "Server started at port " << port << '.' << endl;
server.listen();
return true;
}
pid_t fpid = fork();
// child
@ -274,6 +308,7 @@ bool startServer() {
bool stopServer() {
unsigned short port = Socket::DEFAULT_CONNECT_PORT;
//ifstream in(GSERVER_PORT_FILE, ios::in);
ifstream in(Util::gserver_port_file.c_str());
if (in) {
in >> port;
@ -295,3 +330,4 @@ bool stopServer() {
checkSwap();
return true;
}

View File

@ -0,0 +1,62 @@
#include "../Util/Util.h"
#include "../Server/Server.h"
using namespace std;
bool send_backup_msg(unsigned short _port, long _time);
int main(int argc, char* argv[]) {
#ifdef DEBUG
Util util;
#endif
if (argc != 3) {
return -1;
}
if (!Util::isValidPort(string(argv[1]))) {
return -1;
}
unsigned short port;
{
stringstream ss(argv[1]);
ss >> port;
}
long backup_time;
{
stringstream ss(argv[2]);
ss >> backup_time;
}
while (true) {
time_t cur_time = time(NULL);
while (cur_time >= backup_time) {
backup_time += Util::gserver_backup_interval;
}
sleep(backup_time - cur_time);
if (!send_backup_msg(port, backup_time)) {
return -1;
}
}
return 0;
}
bool send_backup_msg(unsigned short _port, long _time) {
stringstream ss;
ss << "backup " << _time;
Socket socket;
if (!socket.create() || !socket.connect("127.0.0.1", _port) || !socket.send(ss.str())) {
return false;
}
string recv_msg;
socket.recv(recv_msg);
socket.close();
if (recv_msg != "done") {
return false;
}
return true;
}

View File

@ -30,6 +30,12 @@ server 118.89.115.42 gstore-pku.com
---
# 数据库可恢复性、事务与日志
以后可能要支持事务那样就需要日志能够支持UNDO, REDO等操作可以以后根据需要再修改。
数据库多版本,可以指定恢复到一个带标签的版本(比如时间作为标签,用户指定标签)
# 并行策略- 线程控制模块
不宜使用并行框架可使用C的pthreadboost的thread库或者启用C++11gcc编译器需要高于4.8.1才能完整支持C++11
@ -619,6 +625,8 @@ gstore是否能处理各种图算法需求呢比如对两点求最短路
是否可以考虑用vf2算法来作子图同构比较效率相互结合
考虑按谓词频度划分比如建立两棵sp2o树两者的缓存大小应该不同
gStore实现的是子图同态算法要实现子图同构只要保证中间表扩展时每一行中不会出现重复的元素即可
Consider the use of Bloom Filter and FM-sketches
---

626
Server.cpp Normal file
View File

@ -0,0 +1,626 @@
/*=============================================================================
# Filename: Server.cpp
# Author: Bookug Lobert
# Mail: 1181955272@qq.com
# Last Modified: 2015-10-25 13:47
# Description:
=============================================================================*/
#include "Server.h"
using namespace std;
Server::Server(unsigned short _port)
{
this->connectionPort = _port;
this->connectionMaxNum = Socket::MAX_CONNECTIONS;
this->databaseMaxNum = 1; // will be updated when supporting multiple databases.
this->database = NULL;
this->next_backup = 0;
this->scheduler_pid = 0;
}
Server::~Server()
{
delete this->database;
}
bool
Server::createConnection()
{
bool flag;
flag = this->socket.create();
if (!flag)
{
cerr << Util::getTimeString() << "cannot create socket. @Server::createConnection" << endl;
return false;
}
flag = this->socket.bind(this->connectionPort);
if (!flag)
{
cerr << Util::getTimeString() << "cannot bind to port " << this->connectionPort << ". @Server::createConnection" << endl;
return false;
}
flag = this->socket.listen();
if (!flag)
{
cerr << Util::getTimeString() << "cannot listen to port" << this->connectionPort << ". @Server::createConnection" << endl;
return false;
}
return true;
}
bool
Server::deleteConnection()
{
bool flag = this->socket.close();
return flag;
}
bool
Server::response(Socket _socket, std::string& _msg)
{
bool flag = _socket.send(_msg);
return flag;
}
void
Server::listen()
{
while (true)
{
Socket new_server_socket;
cout << Util::getTimeString() << "Wait for input..." << endl;
this->socket.accept(new_server_socket);
cout << Util::getTimeString() << "accept new socket." << endl;
string recv_cmd;
bool recv_return = new_server_socket.recv(recv_cmd);
if (!recv_return)
{
cerr << Util::getTimeString() << "receive command from client error. @Server::listen" << endl;
continue;
}
cout << Util::getTimeString() << "received msg: " << recv_cmd << endl;
Operation operation;
bool parser_return = this->parser(recv_cmd, operation);
cout << Util::getTimeString() << "parser_return=" << parser_return << endl; //debug
if (!parser_return)
{
cout << Util::getTimeString() << "parser command error. @Server::listen" << endl;
string ret_msg = "invalid command.";
this->response(new_server_socket, ret_msg);
new_server_socket.close();
continue;
}
string ret_msg;
bool _stop = false;
CommandType cmd_type = operation.getCommand();
switch (cmd_type)
{
case CMD_TEST:
{
ret_msg = "OK";
break;
}
case CMD_LOAD:
{
string db_name = operation.getParameter(0);
this->loadDatabase(db_name, "", ret_msg);
break;
}
case CMD_UNLOAD:
{
string db_name = operation.getParameter(0);
this->unloadDatabase(db_name, "", ret_msg);
break;
}
case CMD_IMPORT:
{
string db_name = operation.getParameter(0);
string rdf_path = operation.getParameter(1);
this->importRDF(db_name, "", rdf_path, ret_msg);
break;
}
case CMD_DROP:
{
string db_name = operation.getParameter(0);
this->dropDatabase(db_name, "", ret_msg);
break;
}
case CMD_QUERY:
{
string query = operation.getParameter(0);
pthread_t timer = Server::start_timer();
if (timer == 0) {
cerr << Util::getTimeString() << "Failed to start timer." << endl;
}
this->query(query, ret_msg);
if (timer != 0 && !Server::stop_timer(timer)) {
cerr << Util::getTimeString() << "Failed to stop timer." << endl;
}
break;
}
case CMD_SHOW:
{
string para = operation.getParameter(0);
if (para == "databases" || para == "all")
{
this->showDatabases(para, "", ret_msg);
}
else
{
ret_msg = "invalid command.";
}
break;
}
//case CMD_INSERT:
//{
// string db_name = operation.getParameter(0);
// string rdf_path = operation.getParameter(1);
// this->insertTriple(db_name, "", rdf_path, ret_msg);
// break;
//}
case CMD_STOP:
{
this->stopServer(ret_msg);
_stop = true;
break;
}
case CMD_BACKUP: {
string para = operation.getParameter(0);
stringstream ss(para);
long time_backup;
ss >> time_backup;
if (this->next_backup == 0) {
break;
}
while (this->next_backup < time_backup) {
this->next_backup += Util::gserver_backup_interval;
}
if (this->next_backup == time_backup) {
this->backup(ret_msg);
}
else {
ret_msg = "done";
}
break;
}
default:
cerr << Util::getTimeString() << "this command is not supported by now. @Server::listen" << endl;
}
this->response(new_server_socket, ret_msg);
new_server_socket.close();
if (_stop) {
this->deleteConnection();
cout << Util::getTimeString() << "server stopped." << endl;
break;
}
if (this->next_backup > 0) {
time_t cur_time = time(NULL);
if (cur_time >= this->next_backup) {
string str;
this->backup(str);
}
}
}
}
bool
Server::parser(std::string _raw_cmd, Operation& _ret_oprt)
{
int cmd_start_pos = 0;
int raw_len = (int)_raw_cmd.size();
for (int i = 0; i < raw_len; i++) {
if (_raw_cmd[i] == '\n') {
_raw_cmd[i] = ' ';
}
}
while (cmd_start_pos < raw_len && _raw_cmd[cmd_start_pos] == ' ') {
cmd_start_pos++;
}
if (cmd_start_pos == raw_len)
{
return false;
}
int idx1 = raw_len;
for (int i = cmd_start_pos; i < raw_len; i++) {
if (_raw_cmd[i] == ' ')
{
idx1 = i;
break;
}
}
string cmd = _raw_cmd.substr(cmd_start_pos, idx1 - cmd_start_pos);
int para_start_pos = idx1;
while (para_start_pos < raw_len && _raw_cmd[para_start_pos] == ' ')
para_start_pos++;
int para_cnt;
if (cmd == "test") {
_ret_oprt.setCommand(CMD_TEST);
para_cnt = 0;
}
else if (cmd == "load") {
_ret_oprt.setCommand(CMD_LOAD);
para_cnt = 1;
}
else if (cmd == "unload") {
_ret_oprt.setCommand(CMD_UNLOAD);
para_cnt = 1;
}
else if (cmd == "import")
{
_ret_oprt.setCommand(CMD_IMPORT);
para_cnt = 2;
}
else if (cmd == "query")
{
_ret_oprt.setCommand(CMD_QUERY);
para_cnt = 1;
}
else if (cmd == "show")
{
_ret_oprt.setCommand(CMD_SHOW);
para_cnt = 1;
}
else if (cmd == "insert")
{
_ret_oprt.setCommand(CMD_INSERT);
para_cnt = 2;
}
else if (cmd == "drop") {
_ret_oprt.setCommand(CMD_DROP);
para_cnt = 1;
}
else if (cmd == "stop") {
_ret_oprt.setCommand(CMD_STOP);
para_cnt = 0;
}
else if (cmd == "backup") {
_ret_oprt.setCommand(CMD_BACKUP);
para_cnt = 1;
}
else
{
return false;
}
vector<string> paras;
int cur_idx = para_start_pos;
for (int i = 1; i <= para_cnt; i++)
{
if (cur_idx >= raw_len)
{
return false;
}
int next_idx = raw_len;
if (i < para_cnt)
{
for (int j = cur_idx; j<raw_len; j++)
if (_raw_cmd[j] == ' ')
{
next_idx = j;
break;
}
}
else
{
for (int j = raw_len - 1; j>cur_idx; j--)
if (_raw_cmd[j] != ' ')
{
next_idx = j + 1;
break;
}
}
paras.push_back(_raw_cmd.substr(cur_idx, next_idx - cur_idx));
cur_idx = next_idx;
while (cur_idx < raw_len && _raw_cmd[cur_idx] == ' ')
cur_idx++;
}
if (cur_idx != raw_len)
{
return false;
}
_ret_oprt.setParameter(paras);
return true;
}
bool
Server::createDatabase(std::string _db_name, std::string _ac_name, std::string& _ret_msg)
{
// to be implemented...
return false;
}
bool
Server::dropDatabase(std::string _db_name, std::string _ac_name, std::string& _ret_msg)
{
if (this->database != NULL) {
_ret_msg = "please do not use this command when you are using a database.";
return false;
}
size_t length = _db_name.length();
if (length < 3 || _db_name.substr(length - 3, 3) != ".db") {
_ret_msg = "you can only drop databases whose names end with \".db\"";
return false;
}
std::string cmd = std::string("rm -rf ") + _db_name;
int ret = system(cmd.c_str());
if (ret == 0) {
_ret_msg = "drop database done.";
return true;
}
else {
_ret_msg = "drop database failed.";
return false;
}
}
bool
Server::loadDatabase(std::string _db_name, std::string _ac_name, std::string& _ret_msg)
{
if (this->database == NULL)
{
this->database = new Database(_db_name);
}
else
{
_ret_msg = "please unload the current db first: " + this->database->getName();
return false;
}
if (!this->database->load()) {
_ret_msg = "load database failed.";
delete this->database;
this->database = NULL;
return false;
}
pid_t fpid = vfork();
// child, scheduler
if (fpid == 0) {
time_t cur_time = time(NULL);
long time_backup = Util::read_backup_time();
long first_backup = cur_time - (cur_time - time_backup) % Util::gserver_backup_interval
+ Util::gserver_backup_interval;
this->next_backup = first_backup;
string s_port = Util::int2string(this->connectionPort);
string s_next_backup = Util::int2string(first_backup);
execl("bin/gserver_backup_scheduler", "gserver_backup_scheduler", s_port.c_str(), s_next_backup.c_str(), NULL);
exit(0);
return true;
}
// parent
if (fpid > 0) {
this->scheduler_pid = fpid;
}
// fork failure
else if (fpid < 0) {
cerr << Util::getTimeString() << "Database will not be backed-up automatically." << endl;
}
_ret_msg = "load database done.";
return true;
}
bool
Server::unloadDatabase(std::string _db_name, std::string _ac_name, std::string& _ret_msg)
{
if (this->database == NULL || this->database->getName() != _db_name)
{
_ret_msg = "database:" + _db_name + " is not loaded.";
return false;
}
delete this->database;
this->database = NULL;
_ret_msg = "unload database done.";
this->next_backup = 0;
kill(this->scheduler_pid, SIGTERM);
waitpid(this->scheduler_pid, NULL, 0);
this->scheduler_pid = 0;
return true;
}
bool
Server::importRDF(std::string _db_name, std::string _ac_name, std::string _rdf_path, std::string& _ret_msg)
{
//if (this->database != NULL && this->database->getName() != _db_name)
if (this->database != NULL)
{
//delete this->database;
//NOTICE:if there is a db loaded, we should not build directly, tell user to unload it first
_ret_msg = "please unload the current db first: " + this->database->getName();
return false;
}
this->database = new Database(_db_name);
bool flag = this->database->build(_rdf_path);
delete this->database;
this->database = NULL;
if (flag)
{
_ret_msg = "import RDF file to database done.";
}
else
{
_ret_msg = "import RDF file to database failed.";
}
return flag;
}
//bool
//Server::insertTriple(std::string _db_name, std::string _ac_name, std::string _rdf_path, std::string& _ret_msg)
//{
// if (this->database != NULL)
// {
// this->database->unload();
// delete this->database;
// }
// this->database = new Database(_db_name);
// bool flag = this->database->insert(_rdf_path);
// if (flag)
// {
// _ret_msg = "insert triple file to database done.";
// }
// else
// {
// _ret_msg = "import triple file to database failed.";
// }
// return flag;
//}
bool
Server::query(const string _query, string& _ret_msg)
{
cout << Util::getTimeString() << "Server query(): " << _query << endl;
if (this->database == NULL)
{
_ret_msg = "database has not been loaded.";
return false;
}
FILE* output = NULL;
#ifdef OUTPUT_QUERY_RESULT
string path = "logs/gserver_query.log";
output = fopen(path.c_str(), "w");
#endif
ResultSet res_set;
int query_ret = this->database->query(_query, res_set, output);
if (output != NULL)
{
fclose(output);
}
bool flag = true;
//cout<<"Server query ret: "<<query_ret<<endl;
if (query_ret <= -100) //select query
{
//_ret_msg = "results are too large!";
//BETTER: divide and transfer if too large to be placed in memory, using Stream
if (query_ret == -100)
{
_ret_msg = res_set.to_str();
}
else //query error
{
flag = false;
_ret_msg = "query failed.";
//BETTER: {type:select} {success:false}
}
}
else //update query
{
if (query_ret >= 0)
{
_ret_msg = "update num: " + Util::int2string(query_ret);
}
else //update error
{
flag = false;
_ret_msg = "update failed.";
}
}
return flag;
}
bool
Server::showDatabases(string _para, string _ac_name, string& _ret_msg)
{
if (_para == "all")
{
_ret_msg = Util::getItemsFromDir(Util::db_home);
return true;
}
if (this->database != NULL)
{
_ret_msg = "\n" + this->database->getName() + "\n";
}
else
{
_ret_msg = "\n[empty]\n";
}
return true;
}
bool Server::stopServer(string& _ret_msg) {
if (this->database != NULL) {
delete this->database;
this->database = NULL;
}
_ret_msg = "server stopped.";
return true;
}
bool Server::backup(string& _ret_msg) {
this->next_backup += Util::gserver_backup_interval;
if (this->database == NULL) {
_ret_msg = "No database in use.";
return false;
}
if (!this->database->backup()) {
_ret_msg = "Backup failed.";
return false;
}
_ret_msg = "done";
return true;
}
pthread_t Server::start_timer() {
pthread_t timer_thread;
if (pthread_create(&timer_thread, NULL, Server::timer, NULL) == 0) {
return timer_thread;
}
return 0;
}
bool Server::stop_timer(pthread_t _timer) {
return pthread_kill(_timer, SIGTERM) == 0;
}
void* Server::timer(void* _args) {
signal(SIGTERM, Server::timer_sigterm_handler);
sleep(Util::gserver_query_timeout);
cerr << Util::getTimeString() << "Query out of time." << endl;
abort();
}
void Server::timer_sigterm_handler(int _signal_num) {
pthread_exit(0);
}

View File

@ -14,8 +14,10 @@
//NOTICE:CMD_DROP is used to remove the database, and CMD_CREATE is not useful because
//we always need to import a dataset to create a gstore db
enum CommandType {
CMD_CONNECT, CMD_EXIT, CMD_TEST, CMD_LOAD, CMD_UNLOAD, CMD_CREATE, CMD_DROP,
CMD_IMPORT, CMD_QUERY, CMD_SHOW, CMD_INSERT, CMD_STOP, CMD_OTHER
CMD_CONNECT, CMD_TEST, CMD_LOAD, CMD_UNLOAD, CMD_CREATE, CMD_DROP, CMD_IMPORT,
CMD_QUERY, CMD_SHOW, CMD_INSERT, CMD_STOP, CMD_BACKUP, CMD_OTHER
//CMD_CONNECT, CMD_EXIT, CMD_TEST, CMD_LOAD, CMD_UNLOAD, CMD_CREATE, CMD_DROP,
//CMD_IMPORT, CMD_QUERY, CMD_SHOW, CMD_INSERT, CMD_STOP, CMD_OTHER
}; // extend the operation command type here.
class Operation

View File

@ -10,15 +10,15 @@
using namespace std;
Server::Server()
{
this->connectionPort = Socket::DEFAULT_CONNECT_PORT;
this->connectionMaxNum = Socket::MAX_CONNECTIONS;
this->databaseMaxNum = 1; // will be updated when supporting multiple databases.
this->database = NULL;
this->db_home = Util::global_config["db_home"];
this->db_suffix = Util::global_config["db_suffix"];
}
//Server::Server()
//{
//this->connectionPort = Socket::DEFAULT_CONNECT_PORT;
//this->connectionMaxNum = Socket::MAX_CONNECTIONS;
//this->databaseMaxNum = 1; // will be updated when supporting multiple databases.
//this->database = NULL;
//this->db_home = Util::global_config["db_home"];
//this->db_suffix = Util::global_config["db_suffix"];
//}
Server::Server(unsigned short _port)
{
@ -26,6 +26,10 @@ Server::Server(unsigned short _port)
this->connectionMaxNum = Socket::MAX_CONNECTIONS;
this->databaseMaxNum = 1; // will be updated when supporting multiple databases.
this->database = NULL;
this->db_home = Util::global_config["db_home"];
this->db_suffix = Util::global_config["db_suffix"];
this->next_backup = 0;
this->scheduler_pid = 0;
}
Server::~Server()
@ -151,7 +155,14 @@ Server::listen()
case CMD_QUERY:
{
string query = operation.getParameter(0);
pthread_t timer = Server::start_timer();
if (timer == 0) {
cerr << Util::getTimeString() << "Failed to start timer." << endl;
}
this->query(query, ret_msg);
if (timer != 0 && !Server::stop_timer(timer)) {
cerr << Util::getTimeString() << "Failed to stop timer." << endl;
}
break;
}
case CMD_SHOW:
@ -180,6 +191,25 @@ Server::listen()
_stop = true;
break;
}
case CMD_BACKUP: {
string para = operation.getParameter(0);
stringstream ss(para);
long time_backup;
ss >> time_backup;
if (this->next_backup == 0) {
break;
}
while (this->next_backup < time_backup) {
this->next_backup += Util::gserver_backup_interval;
}
if (this->next_backup == time_backup) {
this->backup(ret_msg);
}
else {
ret_msg = "done";
}
break;
}
default:
cerr << Util::getTimeString() << "this command is not supported by now. @Server::listen" << endl;
}
@ -191,6 +221,13 @@ Server::listen()
cout << Util::getTimeString() << "server stopped." << endl;
break;
}
if (this->next_backup > 0) {
time_t cur_time = time(NULL);
if (cur_time >= this->next_backup) {
string str;
this->backup(str);
}
}
}
}
@ -269,6 +306,10 @@ Server::parser(std::string _raw_cmd, Operation& _ret_oprt)
_ret_oprt.setCommand(CMD_STOP);
para_cnt = 0;
}
else if (cmd == "backup") {
_ret_oprt.setCommand(CMD_BACKUP);
para_cnt = 1;
}
else
{
return false;
@ -380,9 +421,36 @@ Server::loadDatabase(std::string _db_name, std::string _ac_name, std::string& _r
_ret_msg = "load database failed.";
delete this->database;
this->database = NULL;
return false;
}
return flag;
pid_t fpid = vfork();
// child, scheduler
if (fpid == 0) {
time_t cur_time = time(NULL);
long time_backup = Util::read_backup_time();
long first_backup = cur_time - (cur_time - time_backup) % Util::gserver_backup_interval
+ Util::gserver_backup_interval;
this->next_backup = first_backup;
string s_port = Util::int2string(this->connectionPort);
string s_next_backup = Util::int2string(first_backup);
execl("bin/gserver_backup_scheduler", "gserver_backup_scheduler", s_port.c_str(), s_next_backup.c_str(), NULL);
exit(0);
return true;
}
// parent
if (fpid > 0) {
this->scheduler_pid = fpid;
}
// fork failure
else if (fpid < 0) {
cerr << Util::getTimeString() << "Database will not be backed-up automatically." << endl;
}
//_ret_msg = "load database done.";
return true;
//return flag;
}
bool
@ -398,6 +466,13 @@ Server::unloadDatabase(std::string _db_name, std::string _ac_name, std::string&
this->database = NULL;
_ret_msg = "unload database done.";
this->next_backup = 0;
//string cmd = "kill " + Util::int2string(this->scheduler_pid);
//system(cmd.c_str());
kill(this->scheduler_pid, SIGTERM);
waitpid(this->scheduler_pid, NULL, 0);
this->scheduler_pid = 0;
return true;
}
@ -458,8 +533,9 @@ Server::importRDF(std::string _db_name, std::string _ac_name, std::string _rdf_p
bool
Server::query(const string _query, string& _ret_msg)
{
cout<<"Server query()"<<endl;
cout<<_query<<endl;
//cout<<"Server query()"<<endl;
//cout<<_query<<endl;
cout << Util::getTimeString() << "Server query(): " << _query << endl;
if (this->database == NULL)
{
@ -468,8 +544,8 @@ Server::query(const string _query, string& _ret_msg)
}
FILE* output = NULL;
string path = "logs/gserver_query.log";
#ifdef OUTPUT_QUERY_RESULT
string path = "logs/gserver_query.log";
output = fopen(path.c_str(), "w");
#endif
@ -537,7 +613,7 @@ Server::showDatabases(string _para, string _ac_name, string& _ret_msg)
return true;
}
bool Server::stopServer(std::string& _ret_msg) {
bool Server::stopServer(string& _ret_msg) {
if (this->database != NULL) {
delete this->database;
this->database = NULL;
@ -546,3 +622,40 @@ bool Server::stopServer(std::string& _ret_msg) {
return true;
}
bool Server::backup(string& _ret_msg) {
this->next_backup += Util::gserver_backup_interval;
if (this->database == NULL) {
_ret_msg = "No database in use.";
return false;
}
if (!this->database->backup()) {
_ret_msg = "Backup failed.";
return false;
}
_ret_msg = "done";
return true;
}
pthread_t Server::start_timer() {
pthread_t timer_thread;
if (pthread_create(&timer_thread, NULL, Server::timer, NULL) == 0) {
return timer_thread;
}
return 0;
}
bool Server::stop_timer(pthread_t _timer) {
return pthread_kill(_timer, SIGTERM) == 0;
}
void* Server::timer(void* _args) {
signal(SIGTERM, Server::timer_sigterm_handler);
sleep(Util::gserver_query_timeout);
cerr << Util::getTimeString() << "Query out of time." << endl;
abort();
}
void Server::timer_sigterm_handler(int _signal_num) {
pthread_exit(0);
}

View File

@ -32,8 +32,8 @@
class Server
{
public:
Server();
Server(unsigned short _port);
//Server();
Server(unsigned short _port = Socket::DEFAULT_CONNECT_PORT);
~Server();
bool createConnection();
@ -51,6 +51,7 @@ public:
//bool insertTriple(std::string _db_name, std::string _ac_name, std::string _rdf_path, std::string& _ret_msg);
bool query(const std::string _query, std::string& _ret_msg);
bool stopServer(std::string& _ret_msg);
bool backup(std::string& _ret_msg);
private:
unsigned short connectionPort;
@ -61,6 +62,13 @@ private:
std::string db_home;
std::string db_suffix;
static pthread_t start_timer();
static bool stop_timer(pthread_t _timer);
static void* timer(void* _args);
static void timer_sigterm_handler(int _signal_num);
long next_backup;
pid_t scheduler_pid;
};
#endif // _SERVER_SERVER_H

View File

@ -48,10 +48,6 @@ map<string, string> Util::global_config;
//==================================================================================================================
string Util::gserver_port_file = "bin/.gserver_port";
string Util::gserver_port_swap = "bin/.gserver_port.swap";
string Util::gserver_log = "logs/gserver.log";
//NOTICE:used in Database, Join and Strategy
//int Util::triple_num = 0;
//int Util::pre_num = 0;
@ -69,6 +65,12 @@ FILE* Util::debug_kvstore = NULL; //used by KVstore
FILE* Util::debug_database = NULL; //used by Database
FILE* Util::debug_vstree = NULL; //used by VSTree
string Util::gserver_port_file = "bin/.gserver_port";
string Util::gserver_port_swap = "bin/.gserver_port.swap";
string Util::gserver_log = "logs/gserver.log";
string Util::backup_path = "backups/";
//set hash table
HashFunction Util::hash[] = { Util::simpleHash, Util::APHash, Util::BKDRHash, Util::DJBHash, Util::ELFHash, \
Util::DEKHash, Util::BPHash, Util::FNVHash, Util::HFLPHash, Util::HFHash, Util::JSHash, \
@ -706,6 +708,14 @@ Util::create_dir(const string _dir)
return false;
}
bool
Util::create_file(const string _file) {
if (creat(_file.c_str(), 0755) > 0) {
return true;
}
return false;
}
long
Util::get_cur_time()
{
@ -1650,3 +1660,40 @@ ceiling(unsigned _val, unsigned _base)
return (_val+_base-1) / _base * _base;
}
long
Util::read_backup_time()
{
ifstream in;
in.open(Util::profile.c_str(), ios::in);
if (!in) {
return Util::gserver_backup_time;
}
int buf_size = 512;
char lbuf[buf_size];
while (!in.eof()) {
in.getline(lbuf, buf_size);
regex_t reg;
char pattern[] = "^\\s*BackupTime\\s*=\\s*((0|1)[0-9]|2[0-3])[0-5][0-9]\\s*(\\s#.*)?$";
regcomp(&reg, pattern, REG_EXTENDED | REG_NOSUB);
regmatch_t pm[1];
int status = regexec(&reg, lbuf, 1, pm, 0);
regfree(&reg);
if (status == REG_NOMATCH) {
continue;
}
else if (status != 0) {
in.close();
return Util::gserver_backup_time;
}
for (int i = 11; i < buf_size && lbuf[i]; i++) {
if (lbuf[i] >= '0' && lbuf[i] <= '9') {
in.close();
return 36000 * (lbuf[i] - '0') + 3600 * (lbuf[i + 1] - '0')
+ 600 * (lbuf[i + 2] - '0') + 60 * (lbuf[i + 3] - '0');
}
}
}
in.close();
return Util::gserver_backup_time;
}

View File

@ -31,6 +31,7 @@ in the sparql query can point to the same node in data graph)
#include <locale.h>
#include <assert.h>
#include <libgen.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/types.h>
@ -230,6 +231,15 @@ public:
static const int II_TREE = 2;
static const int IS_TREE = 3;
static std::string gserver_port_file;
static std::string gserver_port_swap;
static std::string gserver_log;
static const int gserver_query_timeout = 10000; // Timeout of gServer's query (in seconds)
static std::string backup_path;
static const long gserver_backup_interval = 86400;
static const long gserver_backup_time = 72000; // Default backup time (UTC)
static int memUsedPercentage();
static int memoryLeft();
static int compare(const char* _str1, unsigned _len1, const char* _str2, unsigned _len2); //QUERY(how to use default args)
@ -248,12 +258,15 @@ public:
static std::string result_id_str(std::vector<unsigned*>& _v, int _var_num);
static bool dir_exist(const std::string _dir);
static bool create_dir(const std:: string _dir);
static bool create_file(const std::string _file);
static long get_cur_time();
static bool save_to_file(const char*, const std::string _content);
static bool isValidPort(std::string);
static bool isValidIP(std::string);
static std::string getTimeString();
static std::string node2string(const char* _raw_str);
static long read_backup_time();
static bool is_literal_ele(TYPE_ENTITY_LITERAL_ID id);
static bool is_entity_ele(TYPE_ENTITY_LITERAL_ID id);
@ -328,9 +341,6 @@ public:
static FILE* debug_database;
static FILE* debug_vstree;
static std::string gserver_port_file;
static std::string gserver_port_swap;
static std::string gserver_log;
private:

View File

@ -41,3 +41,6 @@ buffer_maxium = 100
# If it is closed(that is, the option is uncommented and set to false), then gStore will run fatser but maybe not safe and recoverable
# operation_logs = true
# Time of scheduled backup of gserver (HHMM, UTC)
BackupTime = 2000 # 4 am (GMT+8)

View File

@ -109,7 +109,7 @@ inc = -I./tools/libantlr3c-3.4/ -I./tools/libantlr3c-3.4/include
#gtest
TARGET = $(exedir)gbuild $(exedir)gserver $(exedir)gclient $(exedir)gquery $(exedir)gconsole $(api_java) $(exedir)gadd $(exedir)gsub $(exedir)HttpConnector
TARGET = $(exedir)gbuild $(exedir)gserver $(exedir)gserver_backup_scheduler $(exedir)gclient $(exedir)gquery $(exedir)gconsole $(api_java) $(exedir)gadd $(exedir)gsub $(exedir)HttpConnector
all: $(TARGET)
@ -131,6 +131,9 @@ $(exedir)gquery: $(lib_antlr) $(objdir)gquery.o $(objfile)
$(exedir)gserver: $(lib_antlr) $(objdir)gserver.o $(objfile)
$(CC) $(EXEFLAG) -o $(exedir)gserver $(objdir)gserver.o $(objfile) $(library)
$(exedir)gserver_backup_scheduler: $(lib_antlr) $(objdir)gserver_backup_scheduler.o $(objfile)
$(CC) $(EXEFLAG) -o $(exedir)gserver_backup_scheduler $(objdir)gserver_backup_scheduler.o $(objfile) $(library)
$(exedir)gclient: $(lib_antlr) $(objdir)gclient.o $(objfile)
$(CC) $(EXEFLAG) -o $(exedir)gclient $(objdir)gclient.o $(objfile) $(library)
@ -156,6 +159,9 @@ $(objdir)gquery.o: Main/gquery.cpp Database/Database.h Util/Util.h $(lib_antlr)
$(objdir)gserver.o: Main/gserver.cpp Server/Server.h Util/Util.h $(lib_antlr)
$(CC) $(CFLAGS) Main/gserver.cpp $(inc) -o $(objdir)gserver.o
$(objdir)gserver_backup_scheduler.o: Main/gserver_backup_scheduler.cpp Server/Server.h Util/Util.h $(lib_antlr)
$(CC) $(CFLAGS) Main/gserver_backup_scheduler.cpp $(inc) -o $(objdir)gserver_backup_scheduler.o
$(objdir)gclient.o: Main/gclient.cpp Server/Client.h Util/Util.h $(lib_antlr)
$(CC) $(CFLAGS) Main/gclient.cpp $(inc) -o $(objdir)gclient.o #-DREADLINE_ON
@ -503,5 +509,6 @@ fulltest:
#test the efficience of kvstore, insert/delete/search, use dbpedia170M by default
test-kvstore:
# test/kvstore_test.cpp
echo "TODO"