Simpler jsonstream (#436)

* One simplification.

* Removing untested functions.
This commit is contained in:
Daniel Lemire 2020-01-07 19:10:02 -05:00 committed by GitHub
parent 9842e1f9d0
commit 951c4bedf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 64 deletions

View File

@ -98,12 +98,14 @@ namespace simdjson {
/* Sets a new buffer for this JsonStream. Will also reinitialize all the variables,
* which acts as a reset. A new JsonStream without initializing again.
* */
void set_new_buffer(const char *buf, size_t len);
// todo: implement and test this function, note that _batch_size is mutable
// void set_new_buffer(const char *buf, size_t len);
/* Sets a new buffer for this JsonStream. Will also reinitialize all the variables,
* which is basically a reset. A new JsonStream without initializing again.
* */
void set_new_buffer(const std::string &s) { set_new_buffer(s.data(), s.size()); }
// todo: implement and test this function, note that _batch_size is mutable
// void set_new_buffer(const std::string &s) { set_new_buffer(s.data(), s.size()); }
/* Returns the location (index) of where the next document should be in the buffer.
* Can be used for debugging, it tells the user the position of the end of the last
@ -123,7 +125,6 @@ namespace simdjson {
size_t _len;
size_t _batch_size;
size_t next_json{0};
bool error_on_last_attempt{false};
bool load_next_batch{true};
size_t current_buffer_loc{0};
size_t last_json_buffer_loc{0};
@ -131,10 +132,10 @@ namespace simdjson {
size_t n_bytes_parsed{0};
#ifdef SIMDJSON_THREADS_ENABLED
int stage1_is_ok_thread{0};
#endif
std::thread stage_1_thread;
simdjson::ParsedJson pj_thread;
#endif
};

View File

@ -26,7 +26,7 @@ JsonStream::~JsonStream() {
#endif
}
/* // this implementation is untested and unlikely to work
void JsonStream::set_new_buffer(const char *buf, size_t len) {
#ifdef SIMDJSON_THREADS_ENABLED
if(stage_1_thread.joinable()) {
@ -35,41 +35,40 @@ void JsonStream::set_new_buffer(const char *buf, size_t len) {
#endif
this->_buf = buf;
this->_len = len;
_batch_size = 0;
_batch_size = 0;
_batch_size = 0; // why zero?
_batch_size = 0; // waat??
next_json = 0;
current_buffer_loc = 0;
n_parsed_docs = 0;
error_on_last_attempt= false;
load_next_batch = true;
}
}*/
// todo: this code is too complicated, it should be greatly simplified
#ifdef SIMDJSON_THREADS_ENABLED
// threaded version of json_parse
// todo: simplify this code further
int JsonStream::json_parse(ParsedJson &pj) {
if (pj.byte_capacity == 0) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if(unlikely(pj_thread.byte_capacity < _batch_size)) {
const bool allocok_thread = pj_thread.allocate_capacity(_batch_size);
if (!allocok || !allocok_thread) {
std::cerr << "can't allocate memory" << std::endl;
return false;
if (!allocok_thread) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
}
else if (pj.byte_capacity < _batch_size) {
return simdjson::CAPACITY;
}
#ifdef SIMDJSON_THREADS_ENABLED
if(current_buffer_loc == last_json_buffer_loc) {
load_next_batch = true;
}
#endif
if (load_next_batch) {
#ifdef SIMDJSON_THREADS_ENABLED
if (unlikely(load_next_batch)) {
//First time loading
if(!stage_1_thread.joinable()) {
_buf = _buf + current_buffer_loc;
_len -= current_buffer_loc;
n_bytes_parsed += current_buffer_loc;
_batch_size = std::min(_batch_size, _len);
_batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size);
if(_batch_size == 0) {
@ -100,8 +99,8 @@ int JsonStream::json_parse(ParsedJson &pj) {
_buf = _buf + last_json_buffer_loc;
_len -= last_json_buffer_loc;
n_bytes_parsed += last_json_buffer_loc;
last_json_buffer_loc = 0; //because we want to use it in the if above.
}
// let us decide whether we will start a new thread
if(_len - _batch_size > 0) {
last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(_buf,_batch_size,pj)];
_batch_size = std::min(_batch_size, _len - last_json_buffer_loc);
@ -122,15 +121,43 @@ int JsonStream::json_parse(ParsedJson &pj) {
});
}
}
next_json = 0;
load_next_batch = false;
} // load_next_batch
int res = best_stage2(_buf, _len, pj, next_json);
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(_len > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
//If we loaded a perfect amount of documents last time, we need to skip the first element,
// because it represents the end of the last document
next_json = next_json == 1;
#else
#else // SIMDJSON_THREADS_ENABLED
// single-threaded version of json_parse
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if (unlikely(load_next_batch)) {
_buf = _buf + current_buffer_loc;
_len -= current_buffer_loc;
n_bytes_parsed += current_buffer_loc;
_batch_size = std::min(_batch_size, _len);
_batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size);
int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true);
@ -144,51 +171,27 @@ int JsonStream::json_parse(ParsedJson &pj) {
return pj.error_code;
}
pj.n_structural_indexes = last_index + 1;
#endif
load_next_batch = false;
}
//#define SIMDJSON_IREALLYNEEDHELP
#ifdef SIMDJSON_IREALLYNEEDHELP // for debugging
size_t oldnext_json = next_json;
#endif
} // load_next_batch
int res = best_stage2(_buf, _len, pj, next_json);
#ifdef SIMDJSON_IREALLYNEEDHELP // for debugging
int sizeofdoc = pj.structural_indexes[next_json]-pj.structural_indexes[oldnext_json];
printf("size = %d\n", sizeofdoc);
if(sizeofdoc > 0) {
printf("%.*s\n",sizeofdoc, _buf + pj.structural_indexes[oldnext_json]);
} else {
printf("<empty>\n");
}
#endif
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
error_on_last_attempt = false;
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
} else if (res == simdjson::SUCCESS) {
error_on_last_attempt = false;
n_parsed_docs++;
if(_len > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
#ifndef SIMDJSON_THREADS_ENABLED
next_json = 1;
#endif
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
// We assume the error is because the json was not loaded completely in this batch.
// Load a new batch and if the error persists, it's a genuine error.
else if(!error_on_last_attempt) {
load_next_batch = true;
error_on_last_attempt = true;
res = json_parse(pj);
}
return res;
}
#endif // SIMDJSON_THREADS_ENABLED
size_t JsonStream::get_current_buffer_loc() const {
return current_buffer_loc;
}