fix for Issue 467 (#469)

* Fix for issue467

* Updating single-header

* Let us make it so that JsonStream is constructed from a padded_string which will avoid dangerous overruns.

* Fixing parse_stream

* Updating documentation.
This commit is contained in:
Daniel Lemire 2020-01-29 19:00:18 -05:00 committed by GitHub
parent e695a19d11
commit 28710f8ad5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 447 additions and 460 deletions

View File

@ -163,7 +163,7 @@ int main(int argc, char *argv[]) {
const char * filename2 = argv[2]; const char * filename2 = argv[2];
simdjson::padded_string p2 = simdjson::get_corpus(filename2); simdjson::padded_string p2 = simdjson::get_corpus(filename2);
simdjson::ParsedJson pj2; simdjson::ParsedJson pj2;
simdjson::JsonStream js{p2.data(), p2.size()}; simdjson::JsonStream js{p2};
int parse_res = simdjson::SUCCESS_AND_HAS_MORE; int parse_res = simdjson::SUCCESS_AND_HAS_MORE;
while (parse_res == simdjson::SUCCESS_AND_HAS_MORE) { while (parse_res == simdjson::SUCCESS_AND_HAS_MORE) {

View File

@ -80,7 +80,7 @@ if(test_per_batch) {
for (size_t j = 0; j < 5; j++) { for (size_t j = 0; j < 5; j++) {
//Actual test //Actual test
simdjson::ParsedJson pj; simdjson::ParsedJson pj;
simdjson::JsonStream js{p.data(), p.size(), i}; simdjson::JsonStream js{p, i};
int parse_res = simdjson::SUCCESS_AND_HAS_MORE; int parse_res = simdjson::SUCCESS_AND_HAS_MORE;
auto start = std::chrono::steady_clock::now(); auto start = std::chrono::steady_clock::now();
@ -118,7 +118,7 @@ if(test_best_batch) {
//Actual test //Actual test
simdjson::ParsedJson pj; simdjson::ParsedJson pj;
simdjson::JsonStream js{p.data(), p.size(), 4000000}; simdjson::JsonStream js{p, 4000000};
int parse_res = simdjson::SUCCESS_AND_HAS_MORE; int parse_res = simdjson::SUCCESS_AND_HAS_MORE;
auto start = std::chrono::steady_clock::now(); auto start = std::chrono::steady_clock::now();

View File

@ -112,8 +112,7 @@ Some official formats **(non-exhaustive list)**:
// Various methods are offered to keep track of the status, like get_current_buffer_loc, // Various methods are offered to keep track of the status, like get_current_buffer_loc,
// get_n_parsed_docs, // get_n_parsed_docs,
// get_n_bytes_parsed, etc. // get_n_bytes_parsed, etc.
JsonStream(const char *buf, size_t len, size_t batch_size = 1000000); JsonStream(const simdjson::padded_string &s, size_t batch_size = 1000000);
JsonStream(const std::string &s, size_t batch_size = 1000000);
``` ```
**Methods** **Methods**

View File

@ -7,6 +7,7 @@
#include "simdjson/stage1_find_marks.h" #include "simdjson/stage1_find_marks.h"
#include "simdjson/stage2_build_tape.h" #include "simdjson/stage2_build_tape.h"
#include "simdjson/simdjson.h" #include "simdjson/simdjson.h"
#include "simdjson/padded_string.h"
namespace simdjson { namespace simdjson {
/************************************************************************************* /*************************************************************************************
@ -46,24 +47,7 @@ namespace simdjson {
* get_n_parsed_docs, get_n_bytes_parsed, etc. * get_n_parsed_docs, get_n_bytes_parsed, etc.
* *
* */ * */
JsonStream(const char *buf, size_t len, size_t batch_size = 1000000); JsonStream(const padded_string &s, size_t batch_size = 1000000);
/* Create a JsonStream object that can be used to parse sequentially the valid
* JSON documents found in the buffer "buf".
*
* The batch_size must be at least as large as the biggest document in the file, but
* not too large to submerge the cached memory. We found that 1MB is
* somewhat a sweet spot for now.
*
* The user is expected to call the following json_parse method to parse the next
* valid JSON document found in the buffer. This method can and is expected to be
* called in a loop.
*
* Various methods are offered to keep track of the status, like get_current_buffer_loc,
* get_n_parsed_docs, get_n_bytes_parsed, etc.
*
* */
JsonStream(const std::string &s, size_t batch_size = 1000000) : JsonStream(s.data(), s.size(), batch_size) {};
~JsonStream(); ~JsonStream();
@ -95,18 +79,6 @@ namespace simdjson {
* and should be reused for the other documents in the buffer. */ * and should be reused for the other documents in the buffer. */
int json_parse(ParsedJson &pj); int json_parse(ParsedJson &pj);
/* Sets a new buffer for this JsonStream. Will also reinitialize all the variables,
* which acts as a reset. A new JsonStream without initializing again.
* */
// todo: implement and test this function, note that _batch_size is mutable
// void set_new_buffer(const char *buf, size_t len);
/* Sets a new buffer for this JsonStream. Will also reinitialize all the variables,
* which is basically a reset. A new JsonStream without initializing again.
* */
// todo: implement and test this function, note that _batch_size is mutable
// void set_new_buffer(const std::string &s) { set_new_buffer(s.data(), s.size()); }
/* Returns the location (index) of where the next document should be in the buffer. /* Returns the location (index) of where the next document should be in the buffer.
* Can be used for debugging, it tells the user the position of the end of the last * Can be used for debugging, it tells the user the position of the end of the last
* valid JSON document parsed*/ * valid JSON document parsed*/
@ -121,9 +93,20 @@ namespace simdjson {
size_t get_n_bytes_parsed() const; size_t get_n_bytes_parsed() const;
private: private:
const char *_buf;
size_t _len; inline const char * buf() const {
return str.data() + str_start;
}
inline void advance(size_t offset) {
str_start += offset;
}
inline size_t remaining() const {
return str.size() - str_start;
}
const simdjson::padded_string & str;
size_t _batch_size; size_t _batch_size;
size_t str_start{0};
size_t next_json{0}; size_t next_json{0};
bool load_next_batch{true}; bool load_next_batch{true};
size_t current_buffer_loc{0}; size_t current_buffer_loc{0};

View File

@ -41,7 +41,7 @@ struct padded_string final {
data_ptr[length] = '\0'; // easier when you need a c_str data_ptr[length] = '\0'; // easier when you need a c_str
} }
explicit padded_string(char *data, size_t length) noexcept explicit padded_string(const char *data, size_t length) noexcept
: viable_size(length), data_ptr(allocate_padded_buffer(length)) { : viable_size(length), data_ptr(allocate_padded_buffer(length)) {
if ((data != nullptr) and (data_ptr != nullptr)) { if ((data != nullptr) and (data_ptr != nullptr)) {
memcpy(data_ptr, data, length); memcpy(data_ptr, data, length);

View File

@ -1,4 +1,4 @@
/* auto-generated on Mon Jan 27 10:35:34 EST 2020. Do not edit! */ /* auto-generated on Wed Jan 29 17:53:53 EST 2020. Do not edit! */
#include <iostream> #include <iostream>
#include "simdjson.h" #include "simdjson.h"
@ -23,7 +23,7 @@ int main(int argc, char *argv[]) {
const char * filename2 = argv[2]; const char * filename2 = argv[2];
simdjson::padded_string p2 = simdjson::get_corpus(filename2); simdjson::padded_string p2 = simdjson::get_corpus(filename2);
simdjson::ParsedJson pj2; simdjson::ParsedJson pj2;
simdjson::JsonStream js{p2.data(), p2.size()}; simdjson::JsonStream js{p2};
int parse_res = simdjson::SUCCESS_AND_HAS_MORE; int parse_res = simdjson::SUCCESS_AND_HAS_MORE;
while (parse_res == simdjson::SUCCESS_AND_HAS_MORE) { while (parse_res == simdjson::SUCCESS_AND_HAS_MORE) {

View File

@ -1,4 +1,4 @@
/* auto-generated on Mon Jan 27 10:35:34 EST 2020. Do not edit! */ /* auto-generated on Wed Jan 29 17:53:53 EST 2020. Do not edit! */
#include "simdjson.h" #include "simdjson.h"
/* used for http://dmalloc.com/ Dmalloc - Debug Malloc Library */ /* used for http://dmalloc.com/ Dmalloc - Debug Malloc Library */
@ -1155,7 +1155,8 @@ ParsedJson build_parsed_json(const uint8_t *buf, size_t len,
using namespace simdjson; using namespace simdjson;
void find_the_best_supported_implementation();
typedef int (*stage1_functype)(const char *buf, size_t len, ParsedJson &pj, bool streaming); typedef int (*stage1_functype)(const char *buf, size_t len, ParsedJson &pj, bool streaming);
typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size_t &next_json); typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size_t &next_json);
@ -1163,197 +1164,8 @@ typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size
stage1_functype best_stage1; stage1_functype best_stage1;
stage2_functype best_stage2; stage2_functype best_stage2;
JsonStream::JsonStream(const char *buf, size_t len, size_t batchSize)
: _buf(buf), _len(len), _batch_size(batchSize) {
find_the_best_supported_implementation();
}
JsonStream::~JsonStream() {
#ifdef SIMDJSON_THREADS_ENABLED
if(stage_1_thread.joinable()) {
stage_1_thread.join();
}
#endif
}
/* // this implementation is untested and unlikely to work
void JsonStream::set_new_buffer(const char *buf, size_t len) {
#ifdef SIMDJSON_THREADS_ENABLED
if(stage_1_thread.joinable()) {
stage_1_thread.join();
}
#endif
this->_buf = buf;
this->_len = len;
_batch_size = 0; // why zero?
_batch_size = 0; // waat??
next_json = 0;
current_buffer_loc = 0;
n_parsed_docs = 0;
load_next_batch = true;
}*/
#ifdef SIMDJSON_THREADS_ENABLED
// threaded version of json_parse
// todo: simplify this code further
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if(unlikely(pj_thread.byte_capacity < _batch_size)) {
const bool allocok_thread = pj_thread.allocate_capacity(_batch_size);
if (!allocok_thread) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
}
if (unlikely(load_next_batch)) {
//First time loading
if(!stage_1_thread.joinable()) {
_batch_size = std::min(_batch_size, _len);
_batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size);
if(_batch_size == 0) {
pj.error_code = simdjson::UTF8_ERROR;
return pj.error_code;
}
int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true);
if (stage1_is_ok != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok;
return pj.error_code;
}
size_t last_index = find_last_json_buf_idx(_buf, _batch_size, pj);
if(last_index == 0) {
pj.error_code = simdjson::EMPTY;
return pj.error_code;
}
pj.n_structural_indexes = last_index + 1;
}
// the second thread is running or done.
else {
stage_1_thread.join();
if (stage1_is_ok_thread != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok_thread;
return pj.error_code;
}
std::swap(pj.structural_indexes, pj_thread.structural_indexes);
pj.n_structural_indexes = pj_thread.n_structural_indexes;
_buf = _buf + last_json_buffer_loc;
_len -= last_json_buffer_loc;
n_bytes_parsed += last_json_buffer_loc;
}
// let us decide whether we will start a new thread
if(_len - _batch_size > 0) {
last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(_buf,_batch_size,pj)];
_batch_size = std::min(_batch_size, _len - last_json_buffer_loc);
if(_batch_size > 0) {
_batch_size = trimmed_length_safe_utf8((const char*)(_buf + last_json_buffer_loc), _batch_size);
if(_batch_size == 0) {
pj.error_code = simdjson::UTF8_ERROR;
return pj.error_code;
}
// let us capture read-only variables
const char * const b = _buf + last_json_buffer_loc;
const size_t bs = _batch_size;
// we call the thread on a lambda that will update this->stage1_is_ok_thread
// there is only one thread that may write to this value
stage_1_thread = std::thread(
[this, b, bs] {
this->stage1_is_ok_thread = best_stage1(b, bs, this->pj_thread, true);
});
}
}
next_json = 0;
load_next_batch = false;
} // load_next_batch
int res = best_stage2(_buf, _len, pj, next_json);
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(_len > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
#else // SIMDJSON_THREADS_ENABLED
// single-threaded version of json_parse
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if (unlikely(load_next_batch)) {
_buf = _buf + current_buffer_loc;
_len -= current_buffer_loc;
n_bytes_parsed += current_buffer_loc;
_batch_size = std::min(_batch_size, _len);
_batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size);
int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true);
if (stage1_is_ok != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok;
return pj.error_code;
}
size_t last_index = find_last_json_buf_idx(_buf, _batch_size, pj);
if(last_index == 0) {
pj.error_code = simdjson::EMPTY;
return pj.error_code;
}
pj.n_structural_indexes = last_index + 1;
load_next_batch = false;
} // load_next_batch
int res = best_stage2(_buf, _len, pj, next_json);
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(_len > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
next_json = 1;
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
#endif // SIMDJSON_THREADS_ENABLED
size_t JsonStream::get_current_buffer_loc() const {
return current_buffer_loc;
}
size_t JsonStream::get_n_parsed_docs() const {
return n_parsed_docs;
}
size_t JsonStream::get_n_bytes_parsed() const {
return n_bytes_parsed;
}
namespace {
//// TODO: generalize this set of functions. We don't want to have a copy in jsonparser.cpp //// TODO: generalize this set of functions. We don't want to have a copy in jsonparser.cpp
void find_the_best_supported_implementation() { void find_the_best_supported_implementation() {
uint32_t supports = detect_supported_architectures(); uint32_t supports = detect_supported_architectures();
@ -1385,6 +1197,185 @@ void find_the_best_supported_implementation() {
// we throw an exception since this should not be recoverable // we throw an exception since this should not be recoverable
throw new std::runtime_error("unsupported architecture"); throw new std::runtime_error("unsupported architecture");
} }
}
JsonStream::JsonStream(const padded_string& s, size_t batchSize)
: str(s), _batch_size(batchSize) {
find_the_best_supported_implementation();
}
JsonStream::~JsonStream() {
#ifdef SIMDJSON_THREADS_ENABLED
if(stage_1_thread.joinable()) {
stage_1_thread.join();
}
#endif
}
#ifdef SIMDJSON_THREADS_ENABLED
// threaded version of json_parse
// todo: simplify this code further
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if(unlikely(pj_thread.byte_capacity < _batch_size)) {
const bool allocok_thread = pj_thread.allocate_capacity(_batch_size);
if (!allocok_thread) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
}
if (unlikely(load_next_batch)) {
//First time loading
if(!stage_1_thread.joinable()) {
_batch_size = std::min(_batch_size, remaining());
_batch_size = trimmed_length_safe_utf8((const char*)buf(), _batch_size);
if(_batch_size == 0) {
pj.error_code = simdjson::UTF8_ERROR;
return pj.error_code;
}
int stage1_is_ok = best_stage1(buf(), _batch_size, pj, true);
if (stage1_is_ok != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok;
return pj.error_code;
}
size_t last_index = find_last_json_buf_idx(buf(), _batch_size, pj);
if(last_index == 0) {
if(pj.n_structural_indexes == 0) {
pj.error_code = simdjson::EMPTY;
return pj.error_code;
}
} else {
pj.n_structural_indexes = last_index + 1;
}
}
// the second thread is running or done.
else {
stage_1_thread.join();
if (stage1_is_ok_thread != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok_thread;
return pj.error_code;
}
std::swap(pj.structural_indexes, pj_thread.structural_indexes);
pj.n_structural_indexes = pj_thread.n_structural_indexes;
advance(last_json_buffer_loc);
n_bytes_parsed += last_json_buffer_loc;
}
// let us decide whether we will start a new thread
if(remaining() - _batch_size > 0) {
last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(buf(),_batch_size,pj)];
_batch_size = std::min(_batch_size, remaining() - last_json_buffer_loc);
if(_batch_size > 0) {
_batch_size = trimmed_length_safe_utf8((const char*)(buf() + last_json_buffer_loc), _batch_size);
if(_batch_size == 0) {
pj.error_code = simdjson::UTF8_ERROR;
return pj.error_code;
}
// let us capture read-only variables
const char * const b = buf() + last_json_buffer_loc;
const size_t bs = _batch_size;
// we call the thread on a lambda that will update this->stage1_is_ok_thread
// there is only one thread that may write to this value
stage_1_thread = std::thread(
[this, b, bs] {
this->stage1_is_ok_thread = best_stage1(b, bs, this->pj_thread, true);
});
}
}
next_json = 0;
load_next_batch = false;
} // load_next_batch
int res = best_stage2(buf(), remaining(), pj, next_json);
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(remaining() > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
#else // SIMDJSON_THREADS_ENABLED
// single-threaded version of json_parse
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if (unlikely(load_next_batch)) {
advance(current_buffer_loc);
n_bytes_parsed += current_buffer_loc;
_batch_size = std::min(_batch_size, remaining());
_batch_size = trimmed_length_safe_utf8((const char*)buf(), _batch_size);
int stage1_is_ok = best_stage1(buf(), _batch_size, pj, true);
if (stage1_is_ok != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok;
return pj.error_code;
}
size_t last_index = find_last_json_buf_idx(buf(), _batch_size, pj);
if(last_index == 0) {
if(pj.n_structural_indexes == 0) {
pj.error_code = simdjson::EMPTY;
return pj.error_code;
}
} else {
pj.n_structural_indexes = last_index + 1;
}
load_next_batch = false;
} // load_next_batch
int res = best_stage2(buf(), remaining(), pj, next_json);
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(remaining() > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
next_json = 1;
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
#endif // SIMDJSON_THREADS_ENABLED
size_t JsonStream::get_current_buffer_loc() const {
return current_buffer_loc;
}
size_t JsonStream::get_n_parsed_docs() const {
return n_parsed_docs;
}
size_t JsonStream::get_n_bytes_parsed() const {
return n_bytes_parsed;
}
/* end file src/jsonstream.cpp */ /* end file src/jsonstream.cpp */
/* begin file src/arm64/bitmanipulation.h */ /* begin file src/arm64/bitmanipulation.h */
#ifndef SIMDJSON_ARM64_BITMANIPULATION_H #ifndef SIMDJSON_ARM64_BITMANIPULATION_H

View File

@ -1,4 +1,4 @@
/* auto-generated on Mon Jan 27 10:35:34 EST 2020. Do not edit! */ /* auto-generated on Wed Jan 29 17:53:53 EST 2020. Do not edit! */
/* begin file include/simdjson/simdjson_version.h */ /* begin file include/simdjson/simdjson_version.h */
// /include/simdjson/simdjson_version.h automatically generated by release.py, // /include/simdjson/simdjson_version.h automatically generated by release.py,
// do not change by hand // do not change by hand
@ -590,7 +590,7 @@ struct padded_string final {
data_ptr[length] = '\0'; // easier when you need a c_str data_ptr[length] = '\0'; // easier when you need a c_str
} }
explicit padded_string(char *data, size_t length) noexcept explicit padded_string(const char *data, size_t length) noexcept
: viable_size(length), data_ptr(allocate_padded_buffer(length)) { : viable_size(length), data_ptr(allocate_padded_buffer(length)) {
if ((data != nullptr) and (data_ptr != nullptr)) { if ((data != nullptr) and (data_ptr != nullptr)) {
memcpy(data_ptr, data, length); memcpy(data_ptr, data, length);
@ -622,6 +622,7 @@ struct padded_string final {
} }
padded_string &operator=(padded_string &&o) { padded_string &operator=(padded_string &&o) {
aligned_free_char(data_ptr);
data_ptr = o.data_ptr; data_ptr = o.data_ptr;
viable_size = o.viable_size; viable_size = o.viable_size;
o.data_ptr = nullptr; // we take ownership o.data_ptr = nullptr; // we take ownership
@ -1946,24 +1947,7 @@ namespace simdjson {
* get_n_parsed_docs, get_n_bytes_parsed, etc. * get_n_parsed_docs, get_n_bytes_parsed, etc.
* *
* */ * */
JsonStream(const char *buf, size_t len, size_t batch_size = 1000000); JsonStream(const padded_string &s, size_t batch_size = 1000000);
/* Create a JsonStream object that can be used to parse sequentially the valid
* JSON documents found in the buffer "buf".
*
* The batch_size must be at least as large as the biggest document in the file, but
* not too large to submerge the cached memory. We found that 1MB is
* somewhat a sweet spot for now.
*
* The user is expected to call the following json_parse method to parse the next
* valid JSON document found in the buffer. This method can and is expected to be
* called in a loop.
*
* Various methods are offered to keep track of the status, like get_current_buffer_loc,
* get_n_parsed_docs, get_n_bytes_parsed, etc.
*
* */
JsonStream(const std::string &s, size_t batch_size = 1000000) : JsonStream(s.data(), s.size(), batch_size) {};
~JsonStream(); ~JsonStream();
@ -1995,18 +1979,6 @@ namespace simdjson {
* and should be reused for the other documents in the buffer. */ * and should be reused for the other documents in the buffer. */
int json_parse(ParsedJson &pj); int json_parse(ParsedJson &pj);
/* Sets a new buffer for this JsonStream. Will also reinitialize all the variables,
* which acts as a reset. A new JsonStream without initializing again.
* */
// todo: implement and test this function, note that _batch_size is mutable
// void set_new_buffer(const char *buf, size_t len);
/* Sets a new buffer for this JsonStream. Will also reinitialize all the variables,
* which is basically a reset. A new JsonStream without initializing again.
* */
// todo: implement and test this function, note that _batch_size is mutable
// void set_new_buffer(const std::string &s) { set_new_buffer(s.data(), s.size()); }
/* Returns the location (index) of where the next document should be in the buffer. /* Returns the location (index) of where the next document should be in the buffer.
* Can be used for debugging, it tells the user the position of the end of the last * Can be used for debugging, it tells the user the position of the end of the last
* valid JSON document parsed*/ * valid JSON document parsed*/
@ -2021,9 +1993,20 @@ namespace simdjson {
size_t get_n_bytes_parsed() const; size_t get_n_bytes_parsed() const;
private: private:
const char *_buf;
size_t _len; inline const char * buf() const {
return str.data() + str_start;
}
inline void advance(size_t offset) {
str_start += offset;
}
inline size_t remaining() const {
return str.size() - str_start;
}
const simdjson::padded_string & str;
size_t _batch_size; size_t _batch_size;
size_t str_start{0};
size_t next_json{0}; size_t next_json{0};
bool load_next_batch{true}; bool load_next_batch{true};
size_t current_buffer_loc{0}; size_t current_buffer_loc{0};

View File

@ -5,7 +5,6 @@
using namespace simdjson; using namespace simdjson;
void find_the_best_supported_implementation();
typedef int (*stage1_functype)(const char *buf, size_t len, ParsedJson &pj, bool streaming); typedef int (*stage1_functype)(const char *buf, size_t len, ParsedJson &pj, bool streaming);
typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size_t &next_json); typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size_t &next_json);
@ -13,197 +12,8 @@ typedef int (*stage2_functype)(const char *buf, size_t len, ParsedJson &pj, size
stage1_functype best_stage1; stage1_functype best_stage1;
stage2_functype best_stage2; stage2_functype best_stage2;
JsonStream::JsonStream(const char *buf, size_t len, size_t batchSize)
: _buf(buf), _len(len), _batch_size(batchSize) {
find_the_best_supported_implementation();
}
JsonStream::~JsonStream() {
#ifdef SIMDJSON_THREADS_ENABLED
if(stage_1_thread.joinable()) {
stage_1_thread.join();
}
#endif
}
/* // this implementation is untested and unlikely to work
void JsonStream::set_new_buffer(const char *buf, size_t len) {
#ifdef SIMDJSON_THREADS_ENABLED
if(stage_1_thread.joinable()) {
stage_1_thread.join();
}
#endif
this->_buf = buf;
this->_len = len;
_batch_size = 0; // why zero?
_batch_size = 0; // waat??
next_json = 0;
current_buffer_loc = 0;
n_parsed_docs = 0;
load_next_batch = true;
}*/
#ifdef SIMDJSON_THREADS_ENABLED
// threaded version of json_parse
// todo: simplify this code further
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if(unlikely(pj_thread.byte_capacity < _batch_size)) {
const bool allocok_thread = pj_thread.allocate_capacity(_batch_size);
if (!allocok_thread) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
}
if (unlikely(load_next_batch)) {
//First time loading
if(!stage_1_thread.joinable()) {
_batch_size = std::min(_batch_size, _len);
_batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size);
if(_batch_size == 0) {
pj.error_code = simdjson::UTF8_ERROR;
return pj.error_code;
}
int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true);
if (stage1_is_ok != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok;
return pj.error_code;
}
size_t last_index = find_last_json_buf_idx(_buf, _batch_size, pj);
if(last_index == 0) {
pj.error_code = simdjson::EMPTY;
return pj.error_code;
}
pj.n_structural_indexes = last_index + 1;
}
// the second thread is running or done.
else {
stage_1_thread.join();
if (stage1_is_ok_thread != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok_thread;
return pj.error_code;
}
std::swap(pj.structural_indexes, pj_thread.structural_indexes);
pj.n_structural_indexes = pj_thread.n_structural_indexes;
_buf = _buf + last_json_buffer_loc;
_len -= last_json_buffer_loc;
n_bytes_parsed += last_json_buffer_loc;
}
// let us decide whether we will start a new thread
if(_len - _batch_size > 0) {
last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(_buf,_batch_size,pj)];
_batch_size = std::min(_batch_size, _len - last_json_buffer_loc);
if(_batch_size > 0) {
_batch_size = trimmed_length_safe_utf8((const char*)(_buf + last_json_buffer_loc), _batch_size);
if(_batch_size == 0) {
pj.error_code = simdjson::UTF8_ERROR;
return pj.error_code;
}
// let us capture read-only variables
const char * const b = _buf + last_json_buffer_loc;
const size_t bs = _batch_size;
// we call the thread on a lambda that will update this->stage1_is_ok_thread
// there is only one thread that may write to this value
stage_1_thread = std::thread(
[this, b, bs] {
this->stage1_is_ok_thread = best_stage1(b, bs, this->pj_thread, true);
});
}
}
next_json = 0;
load_next_batch = false;
} // load_next_batch
int res = best_stage2(_buf, _len, pj, next_json);
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(_len > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
#else // SIMDJSON_THREADS_ENABLED
// single-threaded version of json_parse
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if (unlikely(load_next_batch)) {
_buf = _buf + current_buffer_loc;
_len -= current_buffer_loc;
n_bytes_parsed += current_buffer_loc;
_batch_size = std::min(_batch_size, _len);
_batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size);
int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true);
if (stage1_is_ok != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok;
return pj.error_code;
}
size_t last_index = find_last_json_buf_idx(_buf, _batch_size, pj);
if(last_index == 0) {
pj.error_code = simdjson::EMPTY;
return pj.error_code;
}
pj.n_structural_indexes = last_index + 1;
load_next_batch = false;
} // load_next_batch
int res = best_stage2(_buf, _len, pj, next_json);
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(_len > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
next_json = 1;
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
#endif // SIMDJSON_THREADS_ENABLED
size_t JsonStream::get_current_buffer_loc() const {
return current_buffer_loc;
}
size_t JsonStream::get_n_parsed_docs() const {
return n_parsed_docs;
}
size_t JsonStream::get_n_bytes_parsed() const {
return n_bytes_parsed;
}
namespace {
//// TODO: generalize this set of functions. We don't want to have a copy in jsonparser.cpp //// TODO: generalize this set of functions. We don't want to have a copy in jsonparser.cpp
void find_the_best_supported_implementation() { void find_the_best_supported_implementation() {
uint32_t supports = detect_supported_architectures(); uint32_t supports = detect_supported_architectures();
@ -235,3 +45,182 @@ void find_the_best_supported_implementation() {
// we throw an exception since this should not be recoverable // we throw an exception since this should not be recoverable
throw new std::runtime_error("unsupported architecture"); throw new std::runtime_error("unsupported architecture");
} }
}
JsonStream::JsonStream(const padded_string& s, size_t batchSize)
: str(s), _batch_size(batchSize) {
find_the_best_supported_implementation();
}
JsonStream::~JsonStream() {
#ifdef SIMDJSON_THREADS_ENABLED
if(stage_1_thread.joinable()) {
stage_1_thread.join();
}
#endif
}
#ifdef SIMDJSON_THREADS_ENABLED
// threaded version of json_parse
// todo: simplify this code further
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if(unlikely(pj_thread.byte_capacity < _batch_size)) {
const bool allocok_thread = pj_thread.allocate_capacity(_batch_size);
if (!allocok_thread) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
}
if (unlikely(load_next_batch)) {
//First time loading
if(!stage_1_thread.joinable()) {
_batch_size = std::min(_batch_size, remaining());
_batch_size = trimmed_length_safe_utf8((const char*)buf(), _batch_size);
if(_batch_size == 0) {
pj.error_code = simdjson::UTF8_ERROR;
return pj.error_code;
}
int stage1_is_ok = best_stage1(buf(), _batch_size, pj, true);
if (stage1_is_ok != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok;
return pj.error_code;
}
size_t last_index = find_last_json_buf_idx(buf(), _batch_size, pj);
if(last_index == 0) {
if(pj.n_structural_indexes == 0) {
pj.error_code = simdjson::EMPTY;
return pj.error_code;
}
} else {
pj.n_structural_indexes = last_index + 1;
}
}
// the second thread is running or done.
else {
stage_1_thread.join();
if (stage1_is_ok_thread != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok_thread;
return pj.error_code;
}
std::swap(pj.structural_indexes, pj_thread.structural_indexes);
pj.n_structural_indexes = pj_thread.n_structural_indexes;
advance(last_json_buffer_loc);
n_bytes_parsed += last_json_buffer_loc;
}
// let us decide whether we will start a new thread
if(remaining() - _batch_size > 0) {
last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(buf(),_batch_size,pj)];
_batch_size = std::min(_batch_size, remaining() - last_json_buffer_loc);
if(_batch_size > 0) {
_batch_size = trimmed_length_safe_utf8((const char*)(buf() + last_json_buffer_loc), _batch_size);
if(_batch_size == 0) {
pj.error_code = simdjson::UTF8_ERROR;
return pj.error_code;
}
// let us capture read-only variables
const char * const b = buf() + last_json_buffer_loc;
const size_t bs = _batch_size;
// we call the thread on a lambda that will update this->stage1_is_ok_thread
// there is only one thread that may write to this value
stage_1_thread = std::thread(
[this, b, bs] {
this->stage1_is_ok_thread = best_stage1(b, bs, this->pj_thread, true);
});
}
}
next_json = 0;
load_next_batch = false;
} // load_next_batch
int res = best_stage2(buf(), remaining(), pj, next_json);
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(remaining() > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
#else // SIMDJSON_THREADS_ENABLED
// single-threaded version of json_parse
int JsonStream::json_parse(ParsedJson &pj) {
if (unlikely(pj.byte_capacity == 0)) {
const bool allocok = pj.allocate_capacity(_batch_size);
if (!allocok) {
pj.error_code = simdjson::MEMALLOC;
return pj.error_code;
}
} else if (unlikely(pj.byte_capacity < _batch_size)) {
pj.error_code = simdjson::CAPACITY;
return pj.error_code;
}
if (unlikely(load_next_batch)) {
advance(current_buffer_loc);
n_bytes_parsed += current_buffer_loc;
_batch_size = std::min(_batch_size, remaining());
_batch_size = trimmed_length_safe_utf8((const char*)buf(), _batch_size);
int stage1_is_ok = best_stage1(buf(), _batch_size, pj, true);
if (stage1_is_ok != simdjson::SUCCESS) {
pj.error_code = stage1_is_ok;
return pj.error_code;
}
size_t last_index = find_last_json_buf_idx(buf(), _batch_size, pj);
if(last_index == 0) {
if(pj.n_structural_indexes == 0) {
pj.error_code = simdjson::EMPTY;
return pj.error_code;
}
} else {
pj.n_structural_indexes = last_index + 1;
}
load_next_batch = false;
} // load_next_batch
int res = best_stage2(buf(), remaining(), pj, next_json);
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
n_parsed_docs++;
current_buffer_loc = pj.structural_indexes[next_json];
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if(remaining() > _batch_size) {
current_buffer_loc = pj.structural_indexes[next_json - 1];
next_json = 1;
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
}
#endif // SIMDJSON_THREADS_ENABLED
size_t JsonStream::get_current_buffer_loc() const {
return current_buffer_loc;
}
size_t JsonStream::get_n_parsed_docs() const {
return n_parsed_docs;
}
size_t JsonStream::get_n_bytes_parsed() const {
return n_bytes_parsed;
}

View File

@ -239,6 +239,44 @@ bool stable_test() {
return newjson == json; return newjson == json;
} }
static bool parse_json_message_issue467(char const* message, std::size_t len, size_t expectedcount) {
simdjson::ParsedJson pj;
size_t count = 0;
if (!pj.allocate_capacity(len)) {
std::cerr << "Failed to allocated memory for simdjson::ParsedJson" << std::endl;
return false;
}
int res;
simdjson::padded_string str(message,len);
simdjson::JsonStream js(str, pj.byte_capacity);
do {
res = js.json_parse(pj);
count++;
} while (res == simdjson::SUCCESS_AND_HAS_MORE);
if (res != simdjson::SUCCESS) {
std::cerr << "Failed with simdjson error= " << simdjson::error_message(res) << std::endl;
return false;
}
if(count != expectedcount) {
std::cerr << "bad count" << std::endl;
return false;
}
return true;
}
bool json_issue467() {
const char * single_message = "{\"error\":[],\"result\":{\"token\":\"xxx\"}}";
const char* two_messages = "{\"error\":[],\"result\":{\"token\":\"xxx\"}}{\"error\":[],\"result\":{\"token\":\"xxx\"}}";
if(!parse_json_message_issue467(single_message, strlen(single_message),1)) {
return false;
}
if(!parse_json_message_issue467(two_messages, strlen(two_messages),2)) {
return false;
}
return true;
}
// returns true if successful // returns true if successful
bool navigate_test() { bool navigate_test() {
std::string json = "{" std::string json = "{"
@ -366,7 +404,8 @@ bool stream_utf8_test() {
for(size_t i = 1000; i < 2000; i += (i>1050?10:1)) { for(size_t i = 1000; i < 2000; i += (i>1050?10:1)) {
printf("."); printf(".");
fflush(NULL); fflush(NULL);
simdjson::JsonStream js{data.c_str(), data.size(), i}; simdjson::padded_string str(data);
simdjson::JsonStream js{str, i};
int parse_res = simdjson::SUCCESS_AND_HAS_MORE; int parse_res = simdjson::SUCCESS_AND_HAS_MORE;
size_t count = 0; size_t count = 0;
simdjson::ParsedJson pj; simdjson::ParsedJson pj;
@ -427,7 +466,8 @@ bool stream_test() {
for(size_t i = 1000; i < 2000; i += (i>1050?10:1)) { for(size_t i = 1000; i < 2000; i += (i>1050?10:1)) {
printf("."); printf(".");
fflush(NULL); fflush(NULL);
simdjson::JsonStream js{data.c_str(), data.size(), i}; simdjson::padded_string str(data);
simdjson::JsonStream js{str, i};
int parse_res = simdjson::SUCCESS_AND_HAS_MORE; int parse_res = simdjson::SUCCESS_AND_HAS_MORE;
size_t count = 0; size_t count = 0;
simdjson::ParsedJson pj; simdjson::ParsedJson pj;
@ -530,6 +570,8 @@ bool skyprophet_test() {
int main() { int main() {
std::cout << "Running basic tests." << std::endl; std::cout << "Running basic tests." << std::endl;
if(!json_issue467())
return EXIT_FAILURE;
if(!stream_test()) if(!stream_test())
return EXIT_FAILURE; return EXIT_FAILURE;
if(!stream_utf8_test()) if(!stream_utf8_test())

View File

@ -86,7 +86,7 @@ bool validate(const char *dirname) {
} }
simdjson::ParsedJson pj; simdjson::ParsedJson pj;
simdjson::JsonStream js{p.data(), p.size()}; simdjson::JsonStream js{p};
++how_many; ++how_many;
int parse_res = simdjson::SUCCESS_AND_HAS_MORE; int parse_res = simdjson::SUCCESS_AND_HAS_MORE;