From 28710f8ad54e01c74ffba6b5cdd2a52e9ed01e5d Mon Sep 17 00:00:00 2001 From: Daniel Lemire Date: Wed, 29 Jan 2020 19:00:18 -0500 Subject: [PATCH] fix for Issue 467 (#469) * Fix for issue467 * Updating single-header * Let us make it so that JsonStream is constructed from a padded_string which will avoid dangerous overruns. * Fixing parse_stream * Updating documentation. --- amalgamation.sh | 2 +- benchmark/parse_stream.cpp | 4 +- doc/JsonStream.md | 3 +- include/simdjson/jsonstream.h | 47 ++-- include/simdjson/padded_string.h | 2 +- singleheader/amalgamation_demo.cpp | 4 +- singleheader/simdjson.cpp | 375 ++++++++++++++--------------- singleheader/simdjson.h | 51 ++-- src/jsonstream.cpp | 371 ++++++++++++++-------------- tests/basictests.cpp | 46 +++- tests/jsonstream_test.cpp | 2 +- 11 files changed, 447 insertions(+), 460 deletions(-) diff --git a/amalgamation.sh b/amalgamation.sh index f97682e5..e7b2e4a1 100755 --- a/amalgamation.sh +++ b/amalgamation.sh @@ -163,7 +163,7 @@ int main(int argc, char *argv[]) { const char * filename2 = argv[2]; simdjson::padded_string p2 = simdjson::get_corpus(filename2); simdjson::ParsedJson pj2; - simdjson::JsonStream js{p2.data(), p2.size()}; + simdjson::JsonStream js{p2}; int parse_res = simdjson::SUCCESS_AND_HAS_MORE; while (parse_res == simdjson::SUCCESS_AND_HAS_MORE) { diff --git a/benchmark/parse_stream.cpp b/benchmark/parse_stream.cpp index d812adcc..2b4fa709 100755 --- a/benchmark/parse_stream.cpp +++ b/benchmark/parse_stream.cpp @@ -80,7 +80,7 @@ if(test_per_batch) { for (size_t j = 0; j < 5; j++) { //Actual test simdjson::ParsedJson pj; - simdjson::JsonStream js{p.data(), p.size(), i}; + simdjson::JsonStream js{p, i}; int parse_res = simdjson::SUCCESS_AND_HAS_MORE; auto start = std::chrono::steady_clock::now(); @@ -118,7 +118,7 @@ if(test_best_batch) { //Actual test simdjson::ParsedJson pj; - simdjson::JsonStream js{p.data(), p.size(), 4000000}; + simdjson::JsonStream js{p, 4000000}; int parse_res = simdjson::SUCCESS_AND_HAS_MORE; auto start = std::chrono::steady_clock::now(); diff --git a/doc/JsonStream.md b/doc/JsonStream.md index ee81a5df..efb78486 100644 --- a/doc/JsonStream.md +++ b/doc/JsonStream.md @@ -112,8 +112,7 @@ Some official formats **(non-exhaustive list)**: // Various methods are offered to keep track of the status, like get_current_buffer_loc, // get_n_parsed_docs, // get_n_bytes_parsed, etc. - JsonStream(const char *buf, size_t len, size_t batch_size = 1000000); - JsonStream(const std::string &s, size_t batch_size = 1000000); + JsonStream(const simdjson::padded_string &s, size_t batch_size = 1000000); ``` **Methods** diff --git a/include/simdjson/jsonstream.h b/include/simdjson/jsonstream.h index c25ba3b5..0bc03893 100644 --- a/include/simdjson/jsonstream.h +++ b/include/simdjson/jsonstream.h @@ -7,6 +7,7 @@ #include "simdjson/stage1_find_marks.h" #include "simdjson/stage2_build_tape.h" #include "simdjson/simdjson.h" +#include "simdjson/padded_string.h" namespace simdjson { /************************************************************************************* @@ -46,24 +47,7 @@ namespace simdjson { * get_n_parsed_docs, get_n_bytes_parsed, etc. * * */ - JsonStream(const char *buf, size_t len, size_t batch_size = 1000000); - - /* Create a JsonStream object that can be used to parse sequentially the valid - * JSON documents found in the buffer "buf". - * - * The batch_size must be at least as large as the biggest document in the file, but - * not too large to submerge the cached memory. We found that 1MB is - * somewhat a sweet spot for now. - * - * The user is expected to call the following json_parse method to parse the next - * valid JSON document found in the buffer. This method can and is expected to be - * called in a loop. - * - * Various methods are offered to keep track of the status, like get_current_buffer_loc, - * get_n_parsed_docs, get_n_bytes_parsed, etc. - * - * */ - JsonStream(const std::string &s, size_t batch_size = 1000000) : JsonStream(s.data(), s.size(), batch_size) {}; + JsonStream(const padded_string &s, size_t batch_size = 1000000); ~JsonStream(); @@ -95,18 +79,6 @@ namespace simdjson { * and should be reused for the other documents in the buffer. */ int json_parse(ParsedJson &pj); - /* Sets a new buffer for this JsonStream. Will also reinitialize all the variables, - * which acts as a reset. A new JsonStream without initializing again. - * */ - // todo: implement and test this function, note that _batch_size is mutable - // void set_new_buffer(const char *buf, size_t len); - - /* Sets a new buffer for this JsonStream. Will also reinitialize all the variables, - * which is basically a reset. A new JsonStream without initializing again. - * */ - // todo: implement and test this function, note that _batch_size is mutable - // void set_new_buffer(const std::string &s) { set_new_buffer(s.data(), s.size()); } - /* Returns the location (index) of where the next document should be in the buffer. * Can be used for debugging, it tells the user the position of the end of the last * valid JSON document parsed*/ @@ -121,9 +93,20 @@ namespace simdjson { size_t get_n_bytes_parsed() const; private: - const char *_buf; - size_t _len; + + inline const char * buf() const { + return str.data() + str_start; + } + inline void advance(size_t offset) { + str_start += offset; + } + + inline size_t remaining() const { + return str.size() - str_start; + } + const simdjson::padded_string & str; size_t _batch_size; + size_t str_start{0}; size_t next_json{0}; bool load_next_batch{true}; size_t current_buffer_loc{0}; diff --git a/include/simdjson/padded_string.h b/include/simdjson/padded_string.h index 2f7601bc..0fb2dec7 100644 --- a/include/simdjson/padded_string.h +++ b/include/simdjson/padded_string.h @@ -41,7 +41,7 @@ struct padded_string final { data_ptr[length] = '\0'; // easier when you need a c_str } - explicit padded_string(char *data, size_t length) noexcept + explicit padded_string(const char *data, size_t length) noexcept : viable_size(length), data_ptr(allocate_padded_buffer(length)) { if ((data != nullptr) and (data_ptr != nullptr)) { memcpy(data_ptr, data, length); diff --git a/singleheader/amalgamation_demo.cpp b/singleheader/amalgamation_demo.cpp index dc942c83..461e6149 100755 --- a/singleheader/amalgamation_demo.cpp +++ b/singleheader/amalgamation_demo.cpp @@ -1,4 +1,4 @@ -/* auto-generated on Mon Jan 27 10:35:34 EST 2020. Do not edit! */ +/* auto-generated on Wed Jan 29 17:53:53 EST 2020. Do not edit! */ #include #include "simdjson.h" @@ -23,7 +23,7 @@ int main(int argc, char *argv[]) { const char * filename2 = argv[2]; simdjson::padded_string p2 = simdjson::get_corpus(filename2); simdjson::ParsedJson pj2; - simdjson::JsonStream js{p2.data(), p2.size()}; + simdjson::JsonStream js{p2}; int parse_res = simdjson::SUCCESS_AND_HAS_MORE; while (parse_res == simdjson::SUCCESS_AND_HAS_MORE) { diff --git a/singleheader/simdjson.cpp b/singleheader/simdjson.cpp index b3f8bce8..aa567960 100755 --- a/singleheader/simdjson.cpp +++ b/singleheader/simdjson.cpp @@ -1,4 +1,4 @@ -/* auto-generated on Mon Jan 27 10:35:34 EST 2020. Do not edit! */ +/* auto-generated on Wed Jan 29 17:53:53 EST 2020. Do not edit! */ #include "simdjson.h" /* used for http://dmalloc.com/ Dmalloc - Debug Malloc Library */ @@ -1155,7 +1155,8 @@ ParsedJson build_parsed_json(const uint8_t *buf, size_t len, using namespace simdjson; -void find_the_best_supported_implementation(); + + typedef int (*stage1_functype)(const char *buf, size_t len, ParsedJson &pj, bool streaming); typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size_t &next_json); @@ -1163,197 +1164,8 @@ typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size stage1_functype best_stage1; stage2_functype best_stage2; -JsonStream::JsonStream(const char *buf, size_t len, size_t batchSize) - : _buf(buf), _len(len), _batch_size(batchSize) { - find_the_best_supported_implementation(); -} - -JsonStream::~JsonStream() { -#ifdef SIMDJSON_THREADS_ENABLED - if(stage_1_thread.joinable()) { - stage_1_thread.join(); - } -#endif -} - -/* // this implementation is untested and unlikely to work -void JsonStream::set_new_buffer(const char *buf, size_t len) { -#ifdef SIMDJSON_THREADS_ENABLED - if(stage_1_thread.joinable()) { - stage_1_thread.join(); - } -#endif - this->_buf = buf; - this->_len = len; - _batch_size = 0; // why zero? - _batch_size = 0; // waat?? - next_json = 0; - current_buffer_loc = 0; - n_parsed_docs = 0; - load_next_batch = true; -}*/ - - -#ifdef SIMDJSON_THREADS_ENABLED - -// threaded version of json_parse -// todo: simplify this code further -int JsonStream::json_parse(ParsedJson &pj) { - if (unlikely(pj.byte_capacity == 0)) { - const bool allocok = pj.allocate_capacity(_batch_size); - if (!allocok) { - pj.error_code = simdjson::MEMALLOC; - return pj.error_code; - } - } else if (unlikely(pj.byte_capacity < _batch_size)) { - pj.error_code = simdjson::CAPACITY; - return pj.error_code; - } - if(unlikely(pj_thread.byte_capacity < _batch_size)) { - const bool allocok_thread = pj_thread.allocate_capacity(_batch_size); - if (!allocok_thread) { - pj.error_code = simdjson::MEMALLOC; - return pj.error_code; - } - } - if (unlikely(load_next_batch)) { - //First time loading - if(!stage_1_thread.joinable()) { - _batch_size = std::min(_batch_size, _len); - _batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size); - if(_batch_size == 0) { - pj.error_code = simdjson::UTF8_ERROR; - return pj.error_code; - } - int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true); - if (stage1_is_ok != simdjson::SUCCESS) { - pj.error_code = stage1_is_ok; - return pj.error_code; - } - size_t last_index = find_last_json_buf_idx(_buf, _batch_size, pj); - if(last_index == 0) { - pj.error_code = simdjson::EMPTY; - return pj.error_code; - } - pj.n_structural_indexes = last_index + 1; - } - // the second thread is running or done. - else { - stage_1_thread.join(); - if (stage1_is_ok_thread != simdjson::SUCCESS) { - pj.error_code = stage1_is_ok_thread; - return pj.error_code; - } - std::swap(pj.structural_indexes, pj_thread.structural_indexes); - pj.n_structural_indexes = pj_thread.n_structural_indexes; - _buf = _buf + last_json_buffer_loc; - _len -= last_json_buffer_loc; - n_bytes_parsed += last_json_buffer_loc; - } - // let us decide whether we will start a new thread - if(_len - _batch_size > 0) { - last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(_buf,_batch_size,pj)]; - _batch_size = std::min(_batch_size, _len - last_json_buffer_loc); - if(_batch_size > 0) { - _batch_size = trimmed_length_safe_utf8((const char*)(_buf + last_json_buffer_loc), _batch_size); - if(_batch_size == 0) { - pj.error_code = simdjson::UTF8_ERROR; - return pj.error_code; - } - // let us capture read-only variables - const char * const b = _buf + last_json_buffer_loc; - const size_t bs = _batch_size; - // we call the thread on a lambda that will update this->stage1_is_ok_thread - // there is only one thread that may write to this value - stage_1_thread = std::thread( - [this, b, bs] { - this->stage1_is_ok_thread = best_stage1(b, bs, this->pj_thread, true); - }); - } - } - next_json = 0; - load_next_batch = false; - } // load_next_batch - int res = best_stage2(_buf, _len, pj, next_json); - if (res == simdjson::SUCCESS_AND_HAS_MORE) { - n_parsed_docs++; - current_buffer_loc = pj.structural_indexes[next_json]; - load_next_batch = (current_buffer_loc == last_json_buffer_loc); - } else if (res == simdjson::SUCCESS) { - n_parsed_docs++; - if(_len > _batch_size) { - current_buffer_loc = pj.structural_indexes[next_json - 1]; - load_next_batch = true; - res = simdjson::SUCCESS_AND_HAS_MORE; - } - } - return res; -} - -#else // SIMDJSON_THREADS_ENABLED - -// single-threaded version of json_parse -int JsonStream::json_parse(ParsedJson &pj) { - if (unlikely(pj.byte_capacity == 0)) { - const bool allocok = pj.allocate_capacity(_batch_size); - if (!allocok) { - pj.error_code = simdjson::MEMALLOC; - return pj.error_code; - } - } else if (unlikely(pj.byte_capacity < _batch_size)) { - pj.error_code = simdjson::CAPACITY; - return pj.error_code; - } - if (unlikely(load_next_batch)) { - _buf = _buf + current_buffer_loc; - _len -= current_buffer_loc; - n_bytes_parsed += current_buffer_loc; - _batch_size = std::min(_batch_size, _len); - _batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size); - int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true); - if (stage1_is_ok != simdjson::SUCCESS) { - pj.error_code = stage1_is_ok; - return pj.error_code; - } - size_t last_index = find_last_json_buf_idx(_buf, _batch_size, pj); - if(last_index == 0) { - pj.error_code = simdjson::EMPTY; - return pj.error_code; - } - pj.n_structural_indexes = last_index + 1; - load_next_batch = false; - } // load_next_batch - int res = best_stage2(_buf, _len, pj, next_json); - if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) { - n_parsed_docs++; - current_buffer_loc = pj.structural_indexes[next_json]; - } else if (res == simdjson::SUCCESS) { - n_parsed_docs++; - if(_len > _batch_size) { - current_buffer_loc = pj.structural_indexes[next_json - 1]; - next_json = 1; - load_next_batch = true; - res = simdjson::SUCCESS_AND_HAS_MORE; - } - } - return res; -} - -#endif // SIMDJSON_THREADS_ENABLED - - -size_t JsonStream::get_current_buffer_loc() const { - return current_buffer_loc; -} - -size_t JsonStream::get_n_parsed_docs() const { - return n_parsed_docs; -} - -size_t JsonStream::get_n_bytes_parsed() const { - return n_bytes_parsed; -} +namespace { //// TODO: generalize this set of functions. We don't want to have a copy in jsonparser.cpp void find_the_best_supported_implementation() { uint32_t supports = detect_supported_architectures(); @@ -1385,6 +1197,185 @@ void find_the_best_supported_implementation() { // we throw an exception since this should not be recoverable throw new std::runtime_error("unsupported architecture"); } +} + +JsonStream::JsonStream(const padded_string& s, size_t batchSize) + : str(s), _batch_size(batchSize) { + find_the_best_supported_implementation(); +} + +JsonStream::~JsonStream() { +#ifdef SIMDJSON_THREADS_ENABLED + if(stage_1_thread.joinable()) { + stage_1_thread.join(); + } +#endif +} + + +#ifdef SIMDJSON_THREADS_ENABLED + +// threaded version of json_parse +// todo: simplify this code further +int JsonStream::json_parse(ParsedJson &pj) { + if (unlikely(pj.byte_capacity == 0)) { + const bool allocok = pj.allocate_capacity(_batch_size); + if (!allocok) { + pj.error_code = simdjson::MEMALLOC; + return pj.error_code; + } + } else if (unlikely(pj.byte_capacity < _batch_size)) { + pj.error_code = simdjson::CAPACITY; + return pj.error_code; + } + if(unlikely(pj_thread.byte_capacity < _batch_size)) { + const bool allocok_thread = pj_thread.allocate_capacity(_batch_size); + if (!allocok_thread) { + pj.error_code = simdjson::MEMALLOC; + return pj.error_code; + } + } + if (unlikely(load_next_batch)) { + //First time loading + if(!stage_1_thread.joinable()) { + _batch_size = std::min(_batch_size, remaining()); + _batch_size = trimmed_length_safe_utf8((const char*)buf(), _batch_size); + if(_batch_size == 0) { + pj.error_code = simdjson::UTF8_ERROR; + return pj.error_code; + } + int stage1_is_ok = best_stage1(buf(), _batch_size, pj, true); + if (stage1_is_ok != simdjson::SUCCESS) { + pj.error_code = stage1_is_ok; + return pj.error_code; + } + size_t last_index = find_last_json_buf_idx(buf(), _batch_size, pj); + if(last_index == 0) { + if(pj.n_structural_indexes == 0) { + pj.error_code = simdjson::EMPTY; + return pj.error_code; + } + } else { + pj.n_structural_indexes = last_index + 1; + } + } + // the second thread is running or done. + else { + stage_1_thread.join(); + if (stage1_is_ok_thread != simdjson::SUCCESS) { + pj.error_code = stage1_is_ok_thread; + return pj.error_code; + } + std::swap(pj.structural_indexes, pj_thread.structural_indexes); + pj.n_structural_indexes = pj_thread.n_structural_indexes; + advance(last_json_buffer_loc); + n_bytes_parsed += last_json_buffer_loc; + } + // let us decide whether we will start a new thread + if(remaining() - _batch_size > 0) { + last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(buf(),_batch_size,pj)]; + _batch_size = std::min(_batch_size, remaining() - last_json_buffer_loc); + if(_batch_size > 0) { + _batch_size = trimmed_length_safe_utf8((const char*)(buf() + last_json_buffer_loc), _batch_size); + if(_batch_size == 0) { + pj.error_code = simdjson::UTF8_ERROR; + return pj.error_code; + } + // let us capture read-only variables + const char * const b = buf() + last_json_buffer_loc; + const size_t bs = _batch_size; + // we call the thread on a lambda that will update this->stage1_is_ok_thread + // there is only one thread that may write to this value + stage_1_thread = std::thread( + [this, b, bs] { + this->stage1_is_ok_thread = best_stage1(b, bs, this->pj_thread, true); + }); + } + } + next_json = 0; + load_next_batch = false; + } // load_next_batch + int res = best_stage2(buf(), remaining(), pj, next_json); + if (res == simdjson::SUCCESS_AND_HAS_MORE) { + n_parsed_docs++; + current_buffer_loc = pj.structural_indexes[next_json]; + load_next_batch = (current_buffer_loc == last_json_buffer_loc); + } else if (res == simdjson::SUCCESS) { + n_parsed_docs++; + if(remaining() > _batch_size) { + current_buffer_loc = pj.structural_indexes[next_json - 1]; + load_next_batch = true; + res = simdjson::SUCCESS_AND_HAS_MORE; + } + } + return res; +} + +#else // SIMDJSON_THREADS_ENABLED + +// single-threaded version of json_parse +int JsonStream::json_parse(ParsedJson &pj) { + if (unlikely(pj.byte_capacity == 0)) { + const bool allocok = pj.allocate_capacity(_batch_size); + if (!allocok) { + pj.error_code = simdjson::MEMALLOC; + return pj.error_code; + } + } else if (unlikely(pj.byte_capacity < _batch_size)) { + pj.error_code = simdjson::CAPACITY; + return pj.error_code; + } + if (unlikely(load_next_batch)) { + advance(current_buffer_loc); + n_bytes_parsed += current_buffer_loc; + _batch_size = std::min(_batch_size, remaining()); + _batch_size = trimmed_length_safe_utf8((const char*)buf(), _batch_size); + int stage1_is_ok = best_stage1(buf(), _batch_size, pj, true); + if (stage1_is_ok != simdjson::SUCCESS) { + pj.error_code = stage1_is_ok; + return pj.error_code; + } + size_t last_index = find_last_json_buf_idx(buf(), _batch_size, pj); + if(last_index == 0) { + if(pj.n_structural_indexes == 0) { + pj.error_code = simdjson::EMPTY; + return pj.error_code; + } + } else { + pj.n_structural_indexes = last_index + 1; + } + load_next_batch = false; + } // load_next_batch + int res = best_stage2(buf(), remaining(), pj, next_json); + if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) { + n_parsed_docs++; + current_buffer_loc = pj.structural_indexes[next_json]; + } else if (res == simdjson::SUCCESS) { + n_parsed_docs++; + if(remaining() > _batch_size) { + current_buffer_loc = pj.structural_indexes[next_json - 1]; + next_json = 1; + load_next_batch = true; + res = simdjson::SUCCESS_AND_HAS_MORE; + } + } + return res; +} + +#endif // SIMDJSON_THREADS_ENABLED + + +size_t JsonStream::get_current_buffer_loc() const { + return current_buffer_loc; +} + +size_t JsonStream::get_n_parsed_docs() const { + return n_parsed_docs; +} + +size_t JsonStream::get_n_bytes_parsed() const { + return n_bytes_parsed; +} /* end file src/jsonstream.cpp */ /* begin file src/arm64/bitmanipulation.h */ #ifndef SIMDJSON_ARM64_BITMANIPULATION_H diff --git a/singleheader/simdjson.h b/singleheader/simdjson.h index f00ac689..f62319d5 100755 --- a/singleheader/simdjson.h +++ b/singleheader/simdjson.h @@ -1,4 +1,4 @@ -/* auto-generated on Mon Jan 27 10:35:34 EST 2020. Do not edit! */ +/* auto-generated on Wed Jan 29 17:53:53 EST 2020. Do not edit! */ /* begin file include/simdjson/simdjson_version.h */ // /include/simdjson/simdjson_version.h automatically generated by release.py, // do not change by hand @@ -590,7 +590,7 @@ struct padded_string final { data_ptr[length] = '\0'; // easier when you need a c_str } - explicit padded_string(char *data, size_t length) noexcept + explicit padded_string(const char *data, size_t length) noexcept : viable_size(length), data_ptr(allocate_padded_buffer(length)) { if ((data != nullptr) and (data_ptr != nullptr)) { memcpy(data_ptr, data, length); @@ -622,6 +622,7 @@ struct padded_string final { } padded_string &operator=(padded_string &&o) { + aligned_free_char(data_ptr); data_ptr = o.data_ptr; viable_size = o.viable_size; o.data_ptr = nullptr; // we take ownership @@ -1946,24 +1947,7 @@ namespace simdjson { * get_n_parsed_docs, get_n_bytes_parsed, etc. * * */ - JsonStream(const char *buf, size_t len, size_t batch_size = 1000000); - - /* Create a JsonStream object that can be used to parse sequentially the valid - * JSON documents found in the buffer "buf". - * - * The batch_size must be at least as large as the biggest document in the file, but - * not too large to submerge the cached memory. We found that 1MB is - * somewhat a sweet spot for now. - * - * The user is expected to call the following json_parse method to parse the next - * valid JSON document found in the buffer. This method can and is expected to be - * called in a loop. - * - * Various methods are offered to keep track of the status, like get_current_buffer_loc, - * get_n_parsed_docs, get_n_bytes_parsed, etc. - * - * */ - JsonStream(const std::string &s, size_t batch_size = 1000000) : JsonStream(s.data(), s.size(), batch_size) {}; + JsonStream(const padded_string &s, size_t batch_size = 1000000); ~JsonStream(); @@ -1995,18 +1979,6 @@ namespace simdjson { * and should be reused for the other documents in the buffer. */ int json_parse(ParsedJson &pj); - /* Sets a new buffer for this JsonStream. Will also reinitialize all the variables, - * which acts as a reset. A new JsonStream without initializing again. - * */ - // todo: implement and test this function, note that _batch_size is mutable - // void set_new_buffer(const char *buf, size_t len); - - /* Sets a new buffer for this JsonStream. Will also reinitialize all the variables, - * which is basically a reset. A new JsonStream without initializing again. - * */ - // todo: implement and test this function, note that _batch_size is mutable - // void set_new_buffer(const std::string &s) { set_new_buffer(s.data(), s.size()); } - /* Returns the location (index) of where the next document should be in the buffer. * Can be used for debugging, it tells the user the position of the end of the last * valid JSON document parsed*/ @@ -2021,9 +1993,20 @@ namespace simdjson { size_t get_n_bytes_parsed() const; private: - const char *_buf; - size_t _len; + + inline const char * buf() const { + return str.data() + str_start; + } + inline void advance(size_t offset) { + str_start += offset; + } + + inline size_t remaining() const { + return str.size() - str_start; + } + const simdjson::padded_string & str; size_t _batch_size; + size_t str_start{0}; size_t next_json{0}; bool load_next_batch{true}; size_t current_buffer_loc{0}; diff --git a/src/jsonstream.cpp b/src/jsonstream.cpp index b7b0f9ff..2b5759f9 100755 --- a/src/jsonstream.cpp +++ b/src/jsonstream.cpp @@ -5,7 +5,6 @@ using namespace simdjson; -void find_the_best_supported_implementation(); typedef int (*stage1_functype)(const char *buf, size_t len, ParsedJson &pj, bool streaming); typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size_t &next_json); @@ -13,197 +12,8 @@ typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size stage1_functype best_stage1; stage2_functype best_stage2; -JsonStream::JsonStream(const char *buf, size_t len, size_t batchSize) - : _buf(buf), _len(len), _batch_size(batchSize) { - find_the_best_supported_implementation(); -} - -JsonStream::~JsonStream() { -#ifdef SIMDJSON_THREADS_ENABLED - if(stage_1_thread.joinable()) { - stage_1_thread.join(); - } -#endif -} - -/* // this implementation is untested and unlikely to work -void JsonStream::set_new_buffer(const char *buf, size_t len) { -#ifdef SIMDJSON_THREADS_ENABLED - if(stage_1_thread.joinable()) { - stage_1_thread.join(); - } -#endif - this->_buf = buf; - this->_len = len; - _batch_size = 0; // why zero? - _batch_size = 0; // waat?? - next_json = 0; - current_buffer_loc = 0; - n_parsed_docs = 0; - load_next_batch = true; -}*/ - - -#ifdef SIMDJSON_THREADS_ENABLED - -// threaded version of json_parse -// todo: simplify this code further -int JsonStream::json_parse(ParsedJson &pj) { - if (unlikely(pj.byte_capacity == 0)) { - const bool allocok = pj.allocate_capacity(_batch_size); - if (!allocok) { - pj.error_code = simdjson::MEMALLOC; - return pj.error_code; - } - } else if (unlikely(pj.byte_capacity < _batch_size)) { - pj.error_code = simdjson::CAPACITY; - return pj.error_code; - } - if(unlikely(pj_thread.byte_capacity < _batch_size)) { - const bool allocok_thread = pj_thread.allocate_capacity(_batch_size); - if (!allocok_thread) { - pj.error_code = simdjson::MEMALLOC; - return pj.error_code; - } - } - if (unlikely(load_next_batch)) { - //First time loading - if(!stage_1_thread.joinable()) { - _batch_size = std::min(_batch_size, _len); - _batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size); - if(_batch_size == 0) { - pj.error_code = simdjson::UTF8_ERROR; - return pj.error_code; - } - int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true); - if (stage1_is_ok != simdjson::SUCCESS) { - pj.error_code = stage1_is_ok; - return pj.error_code; - } - size_t last_index = find_last_json_buf_idx(_buf, _batch_size, pj); - if(last_index == 0) { - pj.error_code = simdjson::EMPTY; - return pj.error_code; - } - pj.n_structural_indexes = last_index + 1; - } - // the second thread is running or done. - else { - stage_1_thread.join(); - if (stage1_is_ok_thread != simdjson::SUCCESS) { - pj.error_code = stage1_is_ok_thread; - return pj.error_code; - } - std::swap(pj.structural_indexes, pj_thread.structural_indexes); - pj.n_structural_indexes = pj_thread.n_structural_indexes; - _buf = _buf + last_json_buffer_loc; - _len -= last_json_buffer_loc; - n_bytes_parsed += last_json_buffer_loc; - } - // let us decide whether we will start a new thread - if(_len - _batch_size > 0) { - last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(_buf,_batch_size,pj)]; - _batch_size = std::min(_batch_size, _len - last_json_buffer_loc); - if(_batch_size > 0) { - _batch_size = trimmed_length_safe_utf8((const char*)(_buf + last_json_buffer_loc), _batch_size); - if(_batch_size == 0) { - pj.error_code = simdjson::UTF8_ERROR; - return pj.error_code; - } - // let us capture read-only variables - const char * const b = _buf + last_json_buffer_loc; - const size_t bs = _batch_size; - // we call the thread on a lambda that will update this->stage1_is_ok_thread - // there is only one thread that may write to this value - stage_1_thread = std::thread( - [this, b, bs] { - this->stage1_is_ok_thread = best_stage1(b, bs, this->pj_thread, true); - }); - } - } - next_json = 0; - load_next_batch = false; - } // load_next_batch - int res = best_stage2(_buf, _len, pj, next_json); - if (res == simdjson::SUCCESS_AND_HAS_MORE) { - n_parsed_docs++; - current_buffer_loc = pj.structural_indexes[next_json]; - load_next_batch = (current_buffer_loc == last_json_buffer_loc); - } else if (res == simdjson::SUCCESS) { - n_parsed_docs++; - if(_len > _batch_size) { - current_buffer_loc = pj.structural_indexes[next_json - 1]; - load_next_batch = true; - res = simdjson::SUCCESS_AND_HAS_MORE; - } - } - return res; -} - -#else // SIMDJSON_THREADS_ENABLED - -// single-threaded version of json_parse -int JsonStream::json_parse(ParsedJson &pj) { - if (unlikely(pj.byte_capacity == 0)) { - const bool allocok = pj.allocate_capacity(_batch_size); - if (!allocok) { - pj.error_code = simdjson::MEMALLOC; - return pj.error_code; - } - } else if (unlikely(pj.byte_capacity < _batch_size)) { - pj.error_code = simdjson::CAPACITY; - return pj.error_code; - } - if (unlikely(load_next_batch)) { - _buf = _buf + current_buffer_loc; - _len -= current_buffer_loc; - n_bytes_parsed += current_buffer_loc; - _batch_size = std::min(_batch_size, _len); - _batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size); - int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true); - if (stage1_is_ok != simdjson::SUCCESS) { - pj.error_code = stage1_is_ok; - return pj.error_code; - } - size_t last_index = find_last_json_buf_idx(_buf, _batch_size, pj); - if(last_index == 0) { - pj.error_code = simdjson::EMPTY; - return pj.error_code; - } - pj.n_structural_indexes = last_index + 1; - load_next_batch = false; - } // load_next_batch - int res = best_stage2(_buf, _len, pj, next_json); - if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) { - n_parsed_docs++; - current_buffer_loc = pj.structural_indexes[next_json]; - } else if (res == simdjson::SUCCESS) { - n_parsed_docs++; - if(_len > _batch_size) { - current_buffer_loc = pj.structural_indexes[next_json - 1]; - next_json = 1; - load_next_batch = true; - res = simdjson::SUCCESS_AND_HAS_MORE; - } - } - return res; -} - -#endif // SIMDJSON_THREADS_ENABLED - - -size_t JsonStream::get_current_buffer_loc() const { - return current_buffer_loc; -} - -size_t JsonStream::get_n_parsed_docs() const { - return n_parsed_docs; -} - -size_t JsonStream::get_n_bytes_parsed() const { - return n_bytes_parsed; -} +namespace { //// TODO: generalize this set of functions. We don't want to have a copy in jsonparser.cpp void find_the_best_supported_implementation() { uint32_t supports = detect_supported_architectures(); @@ -235,3 +45,182 @@ void find_the_best_supported_implementation() { // we throw an exception since this should not be recoverable throw new std::runtime_error("unsupported architecture"); } +} + +JsonStream::JsonStream(const padded_string& s, size_t batchSize) + : str(s), _batch_size(batchSize) { + find_the_best_supported_implementation(); +} + +JsonStream::~JsonStream() { +#ifdef SIMDJSON_THREADS_ENABLED + if(stage_1_thread.joinable()) { + stage_1_thread.join(); + } +#endif +} + + +#ifdef SIMDJSON_THREADS_ENABLED + +// threaded version of json_parse +// todo: simplify this code further +int JsonStream::json_parse(ParsedJson &pj) { + if (unlikely(pj.byte_capacity == 0)) { + const bool allocok = pj.allocate_capacity(_batch_size); + if (!allocok) { + pj.error_code = simdjson::MEMALLOC; + return pj.error_code; + } + } else if (unlikely(pj.byte_capacity < _batch_size)) { + pj.error_code = simdjson::CAPACITY; + return pj.error_code; + } + if(unlikely(pj_thread.byte_capacity < _batch_size)) { + const bool allocok_thread = pj_thread.allocate_capacity(_batch_size); + if (!allocok_thread) { + pj.error_code = simdjson::MEMALLOC; + return pj.error_code; + } + } + if (unlikely(load_next_batch)) { + //First time loading + if(!stage_1_thread.joinable()) { + _batch_size = std::min(_batch_size, remaining()); + _batch_size = trimmed_length_safe_utf8((const char*)buf(), _batch_size); + if(_batch_size == 0) { + pj.error_code = simdjson::UTF8_ERROR; + return pj.error_code; + } + int stage1_is_ok = best_stage1(buf(), _batch_size, pj, true); + if (stage1_is_ok != simdjson::SUCCESS) { + pj.error_code = stage1_is_ok; + return pj.error_code; + } + size_t last_index = find_last_json_buf_idx(buf(), _batch_size, pj); + if(last_index == 0) { + if(pj.n_structural_indexes == 0) { + pj.error_code = simdjson::EMPTY; + return pj.error_code; + } + } else { + pj.n_structural_indexes = last_index + 1; + } + } + // the second thread is running or done. + else { + stage_1_thread.join(); + if (stage1_is_ok_thread != simdjson::SUCCESS) { + pj.error_code = stage1_is_ok_thread; + return pj.error_code; + } + std::swap(pj.structural_indexes, pj_thread.structural_indexes); + pj.n_structural_indexes = pj_thread.n_structural_indexes; + advance(last_json_buffer_loc); + n_bytes_parsed += last_json_buffer_loc; + } + // let us decide whether we will start a new thread + if(remaining() - _batch_size > 0) { + last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(buf(),_batch_size,pj)]; + _batch_size = std::min(_batch_size, remaining() - last_json_buffer_loc); + if(_batch_size > 0) { + _batch_size = trimmed_length_safe_utf8((const char*)(buf() + last_json_buffer_loc), _batch_size); + if(_batch_size == 0) { + pj.error_code = simdjson::UTF8_ERROR; + return pj.error_code; + } + // let us capture read-only variables + const char * const b = buf() + last_json_buffer_loc; + const size_t bs = _batch_size; + // we call the thread on a lambda that will update this->stage1_is_ok_thread + // there is only one thread that may write to this value + stage_1_thread = std::thread( + [this, b, bs] { + this->stage1_is_ok_thread = best_stage1(b, bs, this->pj_thread, true); + }); + } + } + next_json = 0; + load_next_batch = false; + } // load_next_batch + int res = best_stage2(buf(), remaining(), pj, next_json); + if (res == simdjson::SUCCESS_AND_HAS_MORE) { + n_parsed_docs++; + current_buffer_loc = pj.structural_indexes[next_json]; + load_next_batch = (current_buffer_loc == last_json_buffer_loc); + } else if (res == simdjson::SUCCESS) { + n_parsed_docs++; + if(remaining() > _batch_size) { + current_buffer_loc = pj.structural_indexes[next_json - 1]; + load_next_batch = true; + res = simdjson::SUCCESS_AND_HAS_MORE; + } + } + return res; +} + +#else // SIMDJSON_THREADS_ENABLED + +// single-threaded version of json_parse +int JsonStream::json_parse(ParsedJson &pj) { + if (unlikely(pj.byte_capacity == 0)) { + const bool allocok = pj.allocate_capacity(_batch_size); + if (!allocok) { + pj.error_code = simdjson::MEMALLOC; + return pj.error_code; + } + } else if (unlikely(pj.byte_capacity < _batch_size)) { + pj.error_code = simdjson::CAPACITY; + return pj.error_code; + } + if (unlikely(load_next_batch)) { + advance(current_buffer_loc); + n_bytes_parsed += current_buffer_loc; + _batch_size = std::min(_batch_size, remaining()); + _batch_size = trimmed_length_safe_utf8((const char*)buf(), _batch_size); + int stage1_is_ok = best_stage1(buf(), _batch_size, pj, true); + if (stage1_is_ok != simdjson::SUCCESS) { + pj.error_code = stage1_is_ok; + return pj.error_code; + } + size_t last_index = find_last_json_buf_idx(buf(), _batch_size, pj); + if(last_index == 0) { + if(pj.n_structural_indexes == 0) { + pj.error_code = simdjson::EMPTY; + return pj.error_code; + } + } else { + pj.n_structural_indexes = last_index + 1; + } + load_next_batch = false; + } // load_next_batch + int res = best_stage2(buf(), remaining(), pj, next_json); + if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) { + n_parsed_docs++; + current_buffer_loc = pj.structural_indexes[next_json]; + } else if (res == simdjson::SUCCESS) { + n_parsed_docs++; + if(remaining() > _batch_size) { + current_buffer_loc = pj.structural_indexes[next_json - 1]; + next_json = 1; + load_next_batch = true; + res = simdjson::SUCCESS_AND_HAS_MORE; + } + } + return res; +} + +#endif // SIMDJSON_THREADS_ENABLED + + +size_t JsonStream::get_current_buffer_loc() const { + return current_buffer_loc; +} + +size_t JsonStream::get_n_parsed_docs() const { + return n_parsed_docs; +} + +size_t JsonStream::get_n_bytes_parsed() const { + return n_bytes_parsed; +} diff --git a/tests/basictests.cpp b/tests/basictests.cpp index 23615434..4bb7f53d 100644 --- a/tests/basictests.cpp +++ b/tests/basictests.cpp @@ -239,6 +239,44 @@ bool stable_test() { return newjson == json; } +static bool parse_json_message_issue467(char const* message, std::size_t len, size_t expectedcount) { + simdjson::ParsedJson pj; + size_t count = 0; + if (!pj.allocate_capacity(len)) { + std::cerr << "Failed to allocated memory for simdjson::ParsedJson" << std::endl; + return false; + } + int res; + simdjson::padded_string str(message,len); + simdjson::JsonStream js(str, pj.byte_capacity); + do { + res = js.json_parse(pj); + count++; + } while (res == simdjson::SUCCESS_AND_HAS_MORE); + if (res != simdjson::SUCCESS) { + std::cerr << "Failed with simdjson error= " << simdjson::error_message(res) << std::endl; + return false; + } + if(count != expectedcount) { + std::cerr << "bad count" << std::endl; + return false; + } + return true; +} + +bool json_issue467() { + const char * single_message = "{\"error\":[],\"result\":{\"token\":\"xxx\"}}"; + const char* two_messages = "{\"error\":[],\"result\":{\"token\":\"xxx\"}}{\"error\":[],\"result\":{\"token\":\"xxx\"}}"; + + if(!parse_json_message_issue467(single_message, strlen(single_message),1)) { + return false; + } + if(!parse_json_message_issue467(two_messages, strlen(two_messages),2)) { + return false; + } + return true; +} + // returns true if successful bool navigate_test() { std::string json = "{" @@ -366,7 +404,8 @@ bool stream_utf8_test() { for(size_t i = 1000; i < 2000; i += (i>1050?10:1)) { printf("."); fflush(NULL); - simdjson::JsonStream js{data.c_str(), data.size(), i}; + simdjson::padded_string str(data); + simdjson::JsonStream js{str, i}; int parse_res = simdjson::SUCCESS_AND_HAS_MORE; size_t count = 0; simdjson::ParsedJson pj; @@ -427,7 +466,8 @@ bool stream_test() { for(size_t i = 1000; i < 2000; i += (i>1050?10:1)) { printf("."); fflush(NULL); - simdjson::JsonStream js{data.c_str(), data.size(), i}; + simdjson::padded_string str(data); + simdjson::JsonStream js{str, i}; int parse_res = simdjson::SUCCESS_AND_HAS_MORE; size_t count = 0; simdjson::ParsedJson pj; @@ -530,6 +570,8 @@ bool skyprophet_test() { int main() { std::cout << "Running basic tests." << std::endl; + if(!json_issue467()) + return EXIT_FAILURE; if(!stream_test()) return EXIT_FAILURE; if(!stream_utf8_test()) diff --git a/tests/jsonstream_test.cpp b/tests/jsonstream_test.cpp index b612f9f9..b3a48911 100644 --- a/tests/jsonstream_test.cpp +++ b/tests/jsonstream_test.cpp @@ -86,7 +86,7 @@ bool validate(const char *dirname) { } simdjson::ParsedJson pj; - simdjson::JsonStream js{p.data(), p.size()}; + simdjson::JsonStream js{p}; ++how_many; int parse_res = simdjson::SUCCESS_AND_HAS_MORE;