From 9be4a17687126d91ae61a5bc17923e4dd106545e Mon Sep 17 00:00:00 2001 From: John Keiser Date: Tue, 2 Jun 2020 08:09:54 -0700 Subject: [PATCH 1/8] Separate definition from declaration, arrange top down --- src/generic/stage1/buf_block_reader.h | 57 ++++++--- src/generic/stage1/json_structural_indexer.h | 127 ++++++++++--------- 2 files changed, 104 insertions(+), 80 deletions(-) diff --git a/src/generic/stage1/buf_block_reader.h b/src/generic/stage1/buf_block_reader.h index f6c7b4f3..8c5cb883 100644 --- a/src/generic/stage1/buf_block_reader.h +++ b/src/generic/stage1/buf_block_reader.h @@ -2,24 +2,13 @@ template struct buf_block_reader { public: - really_inline buf_block_reader(const uint8_t *_buf, size_t _len) : buf{_buf}, len{_len}, lenminusstep{len < STEP_SIZE ? 0 : len - STEP_SIZE}, idx{0} {} - really_inline size_t block_index() { return idx; } - really_inline bool has_full_block() const { - return idx < lenminusstep; - } - really_inline const uint8_t *full_block() const { - return &buf[idx]; - } - really_inline bool has_remainder() const { - return idx < len; - } - really_inline void get_remainder(uint8_t *tmp_buf) const { - memset(tmp_buf, 0x20, STEP_SIZE); - memcpy(tmp_buf, buf + idx, len - idx); - } - really_inline void advance() { - idx += STEP_SIZE; - } + really_inline buf_block_reader(const uint8_t *_buf, size_t _len); + really_inline size_t block_index(); + really_inline bool has_full_block() const; + really_inline const uint8_t *full_block() const; + really_inline bool has_remainder() const; + really_inline void get_remainder(uint8_t *tmp_buf) const; + really_inline void advance(); private: const uint8_t *buf; const size_t len; @@ -27,6 +16,38 @@ private: size_t idx; }; +template +really_inline buf_block_reader::buf_block_reader(const uint8_t *_buf, size_t _len) : buf{_buf}, len{_len}, lenminusstep{len < STEP_SIZE ? 0 : len - STEP_SIZE}, idx{0} {} + +template +really_inline size_t buf_block_reader::block_index() { return idx; } + +template +really_inline bool buf_block_reader::has_full_block() const { + return idx < lenminusstep; +} + +template +really_inline const uint8_t *buf_block_reader::full_block() const { + return &buf[idx]; +} + +template +really_inline bool buf_block_reader::has_remainder() const { + return idx < len; +} + +template +really_inline void buf_block_reader::get_remainder(uint8_t *tmp_buf) const { + memset(tmp_buf, 0x20, STEP_SIZE); + memcpy(tmp_buf, buf + idx, len - idx); +} + +template +really_inline void buf_block_reader::advance() { + idx += STEP_SIZE; +} + // Routines to print masks and text for debugging bitmask operations UNUSED static char * format_input_text(const simd8x64 in) { static char *buf = (char*)malloc(sizeof(simd8x64) + 1); diff --git a/src/generic/stage1/json_structural_indexer.h b/src/generic/stage1/json_structural_indexer.h index 9814de89..9949ed83 100644 --- a/src/generic/stage1/json_structural_indexer.h +++ b/src/generic/stage1/json_structural_indexer.h @@ -57,12 +57,18 @@ public: class json_structural_indexer { public: + /** + * Find the important bits of JSON in a 128-byte chunk, and add them to structural_indexes. + * + * @param streaming Setting the streaming parameter to true allows the find_structural_bits to + * tolerate unclosed strings. The caller should still ensure that the input is valid UTF-8. If + * you are processing substrings, you may want to call on a function like trimmed_length_safe_utf8. + */ template static error_code index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept; private: - really_inline json_structural_indexer(uint32_t *structural_indexes) - : indexer{structural_indexes} {} + really_inline json_structural_indexer(uint32_t *structural_indexes); template really_inline void step(const uint8_t *block, buf_block_reader &reader) noexcept; really_inline void next(simd::simd8x64 in, json_block block, size_t idx); @@ -75,6 +81,63 @@ private: uint64_t unescaped_chars_error = 0; }; +really_inline json_structural_indexer::json_structural_indexer(uint32_t *structural_indexes) : indexer{structural_indexes} {} + +// +// PERF NOTES: +// We pipe 2 inputs through these stages: +// 1. Load JSON into registers. This takes a long time and is highly parallelizable, so we load +// 2 inputs' worth at once so that by the time step 2 is looking for them input, it's available. +// 2. Scan the JSON for critical data: strings, scalars and operators. This is the critical path. +// The output of step 1 depends entirely on this information. These functions don't quite use +// up enough CPU: the second half of the functions is highly serial, only using 1 execution core +// at a time. The second input's scans has some dependency on the first ones finishing it, but +// they can make a lot of progress before they need that information. +// 3. Step 1 doesn't use enough capacity, so we run some extra stuff while we're waiting for that +// to finish: utf-8 checks and generating the output from the last iteration. +// +// The reason we run 2 inputs at a time, is steps 2 and 3 are//still* not enough to soak up all +// available capacity with just one input. Running 2 at a time seems to give the CPU a good enough +// workout. +// +template +error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept { + if (unlikely(len > parser.capacity())) { return CAPACITY; } + + buf_block_reader reader(buf, len); + json_structural_indexer indexer(parser.structural_indexes.get()); + while (reader.has_full_block()) { + indexer.step(reader.full_block(), reader); + } + + if (likely(reader.has_remainder())) { + uint8_t block[STEP_SIZE]; + reader.get_remainder(block); + indexer.step(block, reader); + } + + return indexer.finish(parser, reader.block_index(), len, streaming); +} + +template<> +really_inline void json_structural_indexer::step<128>(const uint8_t *block, buf_block_reader<128> &reader) noexcept { + simd::simd8x64 in_1(block); + simd::simd8x64 in_2(block+64); + json_block block_1 = scanner.next(in_1); + json_block block_2 = scanner.next(in_2); + this->next(in_1, block_1, reader.block_index()); + this->next(in_2, block_2, reader.block_index()+64); + reader.advance(); +} + +template<> +really_inline void json_structural_indexer::step<64>(const uint8_t *block, buf_block_reader<64> &reader) noexcept { + simd::simd8x64 in_1(block); + json_block block_1 = scanner.next(in_1); + this->next(in_1, block_1, reader.block_index()); + reader.advance(); +} + really_inline void json_structural_indexer::next(simd::simd8x64 in, json_block block, size_t idx) { uint64_t unescaped = in.lteq(0x1F); checker.check_next_input(in); @@ -128,64 +191,4 @@ really_inline error_code json_structural_indexer::finish(dom_parser_implementati return checker.errors(); } -template<> -really_inline void json_structural_indexer::step<128>(const uint8_t *block, buf_block_reader<128> &reader) noexcept { - simd::simd8x64 in_1(block); - simd::simd8x64 in_2(block+64); - json_block block_1 = scanner.next(in_1); - json_block block_2 = scanner.next(in_2); - this->next(in_1, block_1, reader.block_index()); - this->next(in_2, block_2, reader.block_index()+64); - reader.advance(); -} - -template<> -really_inline void json_structural_indexer::step<64>(const uint8_t *block, buf_block_reader<64> &reader) noexcept { - simd::simd8x64 in_1(block); - json_block block_1 = scanner.next(in_1); - this->next(in_1, block_1, reader.block_index()); - reader.advance(); -} - -// -// Find the important bits of JSON in a 128-byte chunk, and add them to structural_indexes. -// -// PERF NOTES: -// We pipe 2 inputs through these stages: -// 1. Load JSON into registers. This takes a long time and is highly parallelizable, so we load -// 2 inputs' worth at once so that by the time step 2 is looking for them input, it's available. -// 2. Scan the JSON for critical data: strings, scalars and operators. This is the critical path. -// The output of step 1 depends entirely on this information. These functions don't quite use -// up enough CPU: the second half of the functions is highly serial, only using 1 execution core -// at a time. The second input's scans has some dependency on the first ones finishing it, but -// they can make a lot of progress before they need that information. -// 3. Step 1 doesn't use enough capacity, so we run some extra stuff while we're waiting for that -// to finish: utf-8 checks and generating the output from the last iteration. -// -// The reason we run 2 inputs at a time, is steps 2 and 3 are *still* not enough to soak up all -// available capacity with just one input. Running 2 at a time seems to give the CPU a good enough -// workout. -// -// Setting the streaming parameter to true allows the find_structural_bits to tolerate unclosed strings. -// The caller should still ensure that the input is valid UTF-8. If you are processing substrings, -// you may want to call on a function like trimmed_length_safe_utf8. -template -error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept { - if (unlikely(len > parser.capacity())) { return CAPACITY; } - - buf_block_reader reader(buf, len); - json_structural_indexer indexer(parser.structural_indexes.get()); - while (reader.has_full_block()) { - indexer.step(reader.full_block(), reader); - } - - if (likely(reader.has_remainder())) { - uint8_t block[STEP_SIZE]; - reader.get_remainder(block); - indexer.step(block, reader); - } - - return indexer.finish(parser, reader.block_index(), len, streaming); -} - } // namespace stage1 From 8c16ba372ef4b8ffd8f6b2e0699965d025e8ed55 Mon Sep 17 00:00:00 2001 From: John Keiser Date: Tue, 2 Jun 2020 08:32:41 -0700 Subject: [PATCH 2/8] Acknowledge that we always have a remainder --- src/generic/stage1/buf_block_reader.h | 24 ++++++++++++-------- src/generic/stage1/json_minifier.h | 8 ++++--- src/generic/stage1/json_structural_indexer.h | 11 +++++---- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/generic/stage1/buf_block_reader.h b/src/generic/stage1/buf_block_reader.h index 8c5cb883..d67e73d1 100644 --- a/src/generic/stage1/buf_block_reader.h +++ b/src/generic/stage1/buf_block_reader.h @@ -6,8 +6,16 @@ public: really_inline size_t block_index(); really_inline bool has_full_block() const; really_inline const uint8_t *full_block() const; - really_inline bool has_remainder() const; - really_inline void get_remainder(uint8_t *tmp_buf) const; + /** + * Get the last block, padded with spaces. + * + * There will always be a last block, with at least 1 byte, unless len == 0 (in which case this + * function fills the buffer with spaces and returns 0. In particular, if len == STEP_SIZE there + * will be 0 full_blocks and 1 remainder block with STEP_SIZE bytes and no spaces for padding. + * + * @return the number of effective characters in the last block. + */ + really_inline size_t get_remainder(uint8_t *dst) const; really_inline void advance(); private: const uint8_t *buf; @@ -33,14 +41,10 @@ really_inline const uint8_t *buf_block_reader::full_block() const { } template -really_inline bool buf_block_reader::has_remainder() const { - return idx < len; -} - -template -really_inline void buf_block_reader::get_remainder(uint8_t *tmp_buf) const { - memset(tmp_buf, 0x20, STEP_SIZE); - memcpy(tmp_buf, buf + idx, len - idx); +really_inline size_t buf_block_reader::get_remainder(uint8_t *dst) const { + memset(dst, 0x20, STEP_SIZE); // memset STEP_SIZE because it's more efficient to write out 8 or 16 bytes at once. + memcpy(dst, buf + idx, len - idx); + return len - idx; } template diff --git a/src/generic/stage1/json_minifier.h b/src/generic/stage1/json_minifier.h index aa42202e..b51d9f3a 100644 --- a/src/generic/stage1/json_minifier.h +++ b/src/generic/stage1/json_minifier.h @@ -59,13 +59,15 @@ template error_code json_minifier::minify(const uint8_t *buf, size_t len, uint8_t *dst, size_t &dst_len) noexcept { buf_block_reader reader(buf, len); json_minifier minifier(dst); + + // Index the first n-1 blocks while (reader.has_full_block()) { minifier.step(reader.full_block(), reader); } - if (likely(reader.has_remainder())) { - uint8_t block[STEP_SIZE]; - reader.get_remainder(block); + // Index the last (remainder) block, padded with spaces + uint8_t block[STEP_SIZE]; + if (likely(reader.get_remainder(block)) > 0) { minifier.step(block, reader); } diff --git a/src/generic/stage1/json_structural_indexer.h b/src/generic/stage1/json_structural_indexer.h index 9949ed83..e128f809 100644 --- a/src/generic/stage1/json_structural_indexer.h +++ b/src/generic/stage1/json_structural_indexer.h @@ -106,15 +106,16 @@ error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_pa buf_block_reader reader(buf, len); json_structural_indexer indexer(parser.structural_indexes.get()); + + // Read all but the last block while (reader.has_full_block()) { indexer.step(reader.full_block(), reader); } - if (likely(reader.has_remainder())) { - uint8_t block[STEP_SIZE]; - reader.get_remainder(block); - indexer.step(block, reader); - } + // Take care of the last block (will always be there unless file is empty) + uint8_t block[STEP_SIZE]; + if (unlikely(reader.get_remainder(block) == 0)) { return EMPTY; } + indexer.step(block, reader); return indexer.finish(parser, reader.block_index(), len, streaming); } From ef63a84a3e667f87365cc54e70029e565ddaa3a8 Mon Sep 17 00:00:00 2001 From: John Keiser Date: Tue, 2 Jun 2020 20:21:46 -0700 Subject: [PATCH 3/8] Move document stream state to implementation --- include/simdjson/dom/document_stream.h | 104 +++--- include/simdjson/inline/document_stream.h | 329 ++++++------------ .../internal/dom_parser_implementation.h | 8 +- src/generic/dom_parser_implementation.h | 7 +- src/generic/stage1/allocate.h | 2 + src/generic/stage1/buf_block_reader.h | 52 +-- src/generic/stage1/json_structural_indexer.h | 126 ++++++- src/generic/stage2/logger.h | 2 +- .../stage2/streaming_structural_parser.h | 22 +- src/generic/stage2/structural_iterator.h | 4 +- src/generic/stage2/structural_parser.h | 9 +- tests/errortests.cpp | 2 +- 12 files changed, 331 insertions(+), 336 deletions(-) diff --git a/include/simdjson/dom/document_stream.h b/include/simdjson/dom/document_stream.h index 602d1e4a..b1ee1a9d 100644 --- a/include/simdjson/dom/document_stream.h +++ b/include/simdjson/dom/document_stream.h @@ -43,7 +43,7 @@ public: really_inline bool operator!=(const iterator &other) const noexcept; private: - iterator(document_stream& stream, bool finished) noexcept; + really_inline iterator(document_stream &s, bool finished) noexcept; /** The document_stream we're iterating through. */ document_stream& stream; /** Whether we're finished or not. */ @@ -66,7 +66,23 @@ private: document_stream(document_stream &other) = delete; // Disallow copying - really_inline document_stream(dom::parser &parser, const uint8_t *buf, size_t len, size_t batch_size, error_code error = SUCCESS) noexcept; + /** + * Construct a document_stream. Does not allocate or parse anything until the iterator is + * used. + */ + really_inline document_stream( + dom::parser &parser, + const uint8_t *buf, + size_t len, + size_t batch_size, + error_code error = SUCCESS + ) noexcept; + + /** + * Parse the first document in the buffer. Used by begin(), to handle allocation and + * initialization. + */ + inline void start() noexcept; /** * Parse the next document found in the buffer previously given to document_stream. @@ -79,10 +95,7 @@ private: * pre-allocating a capacity defined by the batch_size defined when creating the * document_stream object. * - * The function returns simdjson::SUCCESS_AND_HAS_MORE (an integer = 1) in case - * of success and indicates that the buffer still contains more data to be parsed, - * meaning this function can be called again to return the next JSON document - * after this one. + * The function returns simdjson::EMPTY if there is no more data to be parsed. * * The function returns simdjson::SUCCESS (as integer = 0) in case of success * and indicates that the buffer has successfully been parsed to the end. @@ -93,55 +106,52 @@ private: * the simdjson::error_message function converts these error codes into a string). * * You can also check validity by calling parser.is_valid(). The same parser can - * and should be reused for the other documents in the buffer. */ - inline error_code json_parse() noexcept; + * and should be reused for the other documents in the buffer. + */ + inline void next() noexcept; /** - * Returns the location (index) of where the next document should be in the - * buffer. - * Can be used for debugging, it tells the user the position of the end of the - * last - * valid JSON document parsed + * Pass the next batch through stage 1 and return when finished. + * When threads are enabled, this may wait for the stage 1 thread to finish. */ - inline size_t get_current_buffer_loc() const { return current_buffer_loc; } + inline void load_batch() noexcept; - /** - * Returns the total amount of complete documents parsed by the document_stream, - * in the current buffer, at the given time. - */ - inline size_t get_n_parsed_docs() const { return n_parsed_docs; } + /** Get the next document index. */ + inline size_t next_batch_start() const noexcept; - /** - * Returns the total amount of data (in bytes) parsed by the document_stream, - * in the current buffer, at the given time. - */ - inline size_t get_n_bytes_parsed() const { return n_bytes_parsed; } - - inline const uint8_t *buf() const { return _buf + buf_start; } - - inline void advance(size_t offset) { buf_start += offset; } - - inline size_t remaining() const { return _len - buf_start; } + /** Pass the next batch through stage 1 with the given parser. */ + inline void run_stage1(dom::parser &p, size_t batch_start) noexcept; dom::parser &parser; - const uint8_t *_buf; - const size_t _len; - size_t _batch_size; // this is actually variable! - size_t buf_start{0}; - size_t next_json{0}; - bool load_next_batch{true}; - size_t current_buffer_loc{0}; + const uint8_t *buf; + const size_t len; + const size_t batch_size; + size_t batch_start{0}; + /** The error (or lack thereof) from the current document. */ + error_code error; + #ifdef SIMDJSON_THREADS_ENABLED - size_t last_json_buffer_loc{0}; -#endif - size_t n_parsed_docs{0}; - size_t n_bytes_parsed{0}; - error_code error{SUCCESS_AND_HAS_MORE}; -#ifdef SIMDJSON_THREADS_ENABLED - error_code stage1_is_ok_thread{SUCCESS}; - std::thread stage_1_thread{}; - dom::parser parser_thread{}; -#endif + /** + * Start a thread to run stage 1 on the next batch. + */ + inline void start_stage1_thread() noexcept; + + /** + * Wait for the stage 1 thread to finish and capture the results. + */ + inline void finish_stage1_thread() noexcept; + + /** The error returned from the stage 1 thread. */ + error_code stage1_thread_error{UNINITIALIZED}; + /** The thread used to run stage 1 against the next batch in the background. */ + std::thread stage1_thread{}; + /** + * The parser used to run stage 1 in the background. Will be swapped + * with the regular parser when finished. + */ + dom::parser stage1_thread_parser{}; +#endif // SIMDJSON_THREADS_ENABLED + friend class dom::parser; }; // class document_stream diff --git a/include/simdjson/inline/document_stream.h b/include/simdjson/inline/document_stream.h index 4f2c948e..97a77da5 100644 --- a/include/simdjson/inline/document_stream.h +++ b/include/simdjson/inline/document_stream.h @@ -6,125 +6,37 @@ #include #include -namespace simdjson { -namespace internal { - -/** - * This algorithm is used to quickly identify the buffer position of - * the last JSON document inside the current batch. - * - * It does its work by finding the last pair of structural characters - * that represent the end followed by the start of a document. - * - * Simply put, we iterate over the structural characters, starting from - * the end. We consider that we found the end of a JSON document when the - * first element of the pair is NOT one of these characters: '{' '[' ';' ',' - * and when the second element is NOT one of these characters: '}' '}' ';' ','. - * - * This simple comparison works most of the time, but it does not cover cases - * where the batch's structural indexes contain a perfect amount of documents. - * In such a case, we do not have access to the structural index which follows - * the last document, therefore, we do not have access to the second element in - * the pair, and means that we cannot identify the last document. To fix this - * issue, we keep a count of the open and closed curly/square braces we found - * while searching for the pair. When we find a pair AND the count of open and - * closed curly/square braces is the same, we know that we just passed a - * complete - * document, therefore the last json buffer location is the end of the batch - * */ -inline uint32_t find_last_json_buf_idx(const uint8_t *buf, size_t size, const dom::parser &parser) { - // this function can be generally useful - if (parser.implementation->n_structural_indexes == 0) - return 0; - auto last_i = parser.implementation->n_structural_indexes - 1; - if (parser.implementation->structural_indexes[last_i] == size) { - if (last_i == 0) - return 0; - last_i = parser.implementation->n_structural_indexes - 2; - } - auto arr_cnt = 0; - auto obj_cnt = 0; - for (auto i = last_i; i > 0; i--) { - auto idxb = parser.implementation->structural_indexes[i]; - switch (buf[idxb]) { - case ':': - case ',': - continue; - case '}': - obj_cnt--; - continue; - case ']': - arr_cnt--; - continue; - case '{': - obj_cnt++; - break; - case '[': - arr_cnt++; - break; - } - auto idxa = parser.implementation->structural_indexes[i - 1]; - switch (buf[idxa]) { - case '{': - case '[': - case ':': - case ',': - continue; - } - if (!arr_cnt && !obj_cnt) { - return last_i + 1; - } - return i; - } - return 0; -} - -// returns true if the provided byte value is an ASCII character -static inline bool is_ascii(char c) { - return ((unsigned char)c) <= 127; -} - -// if the string ends with UTF-8 values, backtrack -// up to the first ASCII character. May return 0. -static inline size_t trimmed_length_safe_utf8(const char * c, size_t len) { - while ((len > 0) and (not is_ascii(c[len - 1]))) { - len--; - } - return len; -} - -} // namespace internal - -} // namespace simdjson - namespace simdjson { namespace dom { + really_inline document_stream::document_stream( dom::parser &_parser, - const uint8_t *buf, - size_t len, - size_t batch_size, + const uint8_t *_buf, + size_t _len, + size_t _batch_size, error_code _error ) noexcept : parser{_parser}, - _buf{buf}, - _len{len}, - _batch_size(batch_size), - error(_error) + buf{_buf}, + len{_len}, + batch_size{_batch_size}, + error{_error} { - if (!error) { error = json_parse(); } } inline document_stream::~document_stream() noexcept { #ifdef SIMDJSON_THREADS_ENABLED - if (stage_1_thread.joinable()) { - stage_1_thread.join(); + // TODO kill the thread, why should people have to wait for a non-side-effecting operation to complete + if (stage1_thread.joinable()) { + stage1_thread.join(); } #endif } really_inline document_stream::iterator document_stream::begin() noexcept { - return iterator(*this, false); + start(); + // If there are no documents, we're finished. + return iterator(*this, error == EMPTY); } really_inline document_stream::iterator document_stream::end() noexcept { @@ -136,17 +48,15 @@ really_inline document_stream::iterator::iterator(document_stream& _stream, bool } really_inline simdjson_result document_stream::iterator::operator*() noexcept { - error_code err = stream.error == SUCCESS_AND_HAS_MORE ? SUCCESS : stream.error; - if (err) { return err; } + // Once we have yielded any errors, we're finished. + if (stream.error) { finished = true; return stream.error; } return stream.parser.doc.root(); } really_inline document_stream::iterator& document_stream::iterator::operator++() noexcept { - if (stream.error == SUCCESS_AND_HAS_MORE) { - stream.error = stream.json_parse(); - } else { - finished = true; - } + stream.next(); + // If that was the last document, we're finished. + if (stream.error == EMPTY) { finished = true; } return *this; } @@ -154,130 +64,99 @@ really_inline bool document_stream::iterator::operator!=(const document_stream:: return finished != other.finished; } +inline void document_stream::start() noexcept { + if (error) { return; } + + error = parser.ensure_capacity(batch_size); + if (error) { return; } + + // Always run the first stage 1 parse immediately + batch_start = 0; + run_stage1(parser, batch_start); + if (error) { return; } + +#ifdef SIMDJSON_THREADS_ENABLED + if (next_batch_start() < len) { + // Kick off the first thread if needed + error = thread_parser.ensure_capacity(batch_size); + if (error) { return; } + start_stage1_thread(); + if (error) { return; } + } +#endif // SIMDJSON_THREADS_ENABLED + + next(); +} + +inline void document_stream::next() noexcept { + if (error) { return; } + + // Load the next document from the batch + error = parser.implementation->stage2_next(parser.doc); + + // If that was the last document in the batch, load another batch (if available) + while (error == EMPTY) { + batch_start = next_batch_start(); + if (batch_start >= len) { break; } + +#ifdef SIMDJSON_THREADS_ENABLED + load_from_stage1_thread(); +#else + run_stage1(parser, batch_start); +#endif + if (error) { continue; } // If the error was EMPTY, we may want to load another batch. + + // Run stage 2 on the first document in the batch + error = parser.implementation->stage2_next(parser.doc); + } +} + +inline size_t document_stream::next_batch_start() const noexcept { + return batch_start + parser.implementation->structural_indexes[parser.implementation->n_structural_indexes]; +} + +inline void document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept { + // If this is the final batch, pass partial = false + size_t remaining = len - _batch_start; + if (remaining <= batch_size) { + error = p.implementation->stage1(&buf[_batch_start], remaining, false); + } else { + error = p.implementation->stage1(&buf[_batch_start], batch_size, true); + } +} + #ifdef SIMDJSON_THREADS_ENABLED -// threaded version of json_parse -// todo: simplify this code further -inline error_code document_stream::json_parse() noexcept { - error = parser.ensure_capacity(_batch_size); - if (error) { return error; } - error = parser_thread.ensure_capacity(_batch_size); - if (error) { return error; } +inline void document_stream::load_from_stage1_thread() noexcept { + stage1_thread.join(); - if (unlikely(load_next_batch)) { - // First time loading - if (!stage_1_thread.joinable()) { - _batch_size = (std::min)(_batch_size, remaining()); - _batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size); - if (_batch_size == 0) { - return simdjson::UTF8_ERROR; - } - auto stage1_is_ok = error_code(parser.implementation->stage1(buf(), _batch_size, true)); - if (stage1_is_ok != simdjson::SUCCESS) { - return stage1_is_ok; - } - uint32_t last_index = internal::find_last_json_buf_idx(buf(), _batch_size, parser); - if (last_index == 0) { - if (parser.implementation->n_structural_indexes == 0) { - return simdjson::EMPTY; - } - } else { - parser.implementation->n_structural_indexes = last_index + 1; - } - } - // the second thread is running or done. - else { - stage_1_thread.join(); - if (stage1_is_ok_thread != simdjson::SUCCESS) { - return stage1_is_ok_thread; - } - std::swap(parser.implementation->structural_indexes, parser_thread.implementation->structural_indexes); - parser.implementation->n_structural_indexes = parser_thread.implementation->n_structural_indexes; - advance(last_json_buffer_loc); - n_bytes_parsed += last_json_buffer_loc; - } - // let us decide whether we will start a new thread - if (remaining() - _batch_size > 0) { - last_json_buffer_loc = - parser.implementation->structural_indexes[internal::find_last_json_buf_idx(buf(), _batch_size, parser)]; - _batch_size = (std::min)(_batch_size, remaining() - last_json_buffer_loc); - if (_batch_size > 0) { - _batch_size = internal::trimmed_length_safe_utf8( - (const char *)(buf() + last_json_buffer_loc), _batch_size); - if (_batch_size == 0) { - return simdjson::UTF8_ERROR; - } - // let us capture read-only variables - const uint8_t *const b = buf() + last_json_buffer_loc; - const size_t bs = _batch_size; - // we call the thread on a lambda that will update - // this->stage1_is_ok_thread - // there is only one thread that may write to this value - stage_1_thread = std::thread([this, b, bs] { - this->stage1_is_ok_thread = error_code(parser_thread.implementation->stage1(b, bs, true)); - }); - } - } - next_json = 0; - load_next_batch = false; - } // load_next_batch - error_code res = parser.implementation->stage2(buf(), remaining(), parser.doc, next_json); - if (res == simdjson::SUCCESS_AND_HAS_MORE) { - n_parsed_docs++; - current_buffer_loc = parser.implementation->structural_indexes[next_json]; - load_next_batch = (current_buffer_loc == last_json_buffer_loc); - } else if (res == simdjson::SUCCESS) { - n_parsed_docs++; - if (remaining() > _batch_size) { - current_buffer_loc = parser.implementation->structural_indexes[next_json - 1]; - load_next_batch = true; - res = simdjson::SUCCESS_AND_HAS_MORE; - } + // Swap to the parser that was loaded up in the thread. Make sure the parser has + // enough memory to swap to, as well. + error = parser.ensure_capacity(batch_size); + if (error) { return error; } + std::swap(parser, stage1_thread_parser); + if (stage1_thread_error) { return stage1_thread_error; } + + // If there's anything left, start the stage 1 thread! + if (next_batch_start() < len) { + start_stage1_thread(); } - return res; } -#else // SIMDJSON_THREADS_ENABLED - -// single-threaded version of json_parse -inline error_code document_stream::json_parse() noexcept { - error = parser.ensure_capacity(_batch_size); - if (error) { return error; } - - if (unlikely(load_next_batch)) { - advance(current_buffer_loc); - n_bytes_parsed += current_buffer_loc; - _batch_size = (std::min)(_batch_size, remaining()); - _batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size); - auto stage1_is_ok = (error_code)parser.implementation->stage1(buf(), _batch_size, true); - if (stage1_is_ok != simdjson::SUCCESS) { - return stage1_is_ok; +inline void document_stream::start_stage1_thread() noexcept { + // we call the thread on a lambda that will update + // this->stage1_thread_error + // there is only one thread that may write to this value + // TODO this is NOT exception-safe. + this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error + stage1_thread = std::thread([this] { + this->stage1_thread_error = this->thread_parser.ensure_capacity(this->batch_size); + if (!this->stage1_thread_error) { + this->stage1_thread_error = run_stage1(this->stage1_thread_parser, this->next_batch_start()); } - uint32_t last_index = internal::find_last_json_buf_idx(buf(), _batch_size, parser); - if (last_index == 0) { - if (parser.implementation->n_structural_indexes == 0) { - return EMPTY; - } - } else { - parser.implementation->n_structural_indexes = last_index + 1; - } - load_next_batch = false; - } // load_next_batch - error_code res = parser.implementation->stage2(buf(), remaining(), parser.doc, next_json); - if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) { - n_parsed_docs++; - current_buffer_loc = parser.implementation->structural_indexes[next_json]; - } else if (res == simdjson::SUCCESS) { - n_parsed_docs++; - if (remaining() > _batch_size) { - current_buffer_loc = parser.implementation->structural_indexes[next_json - 1]; - next_json = 1; - load_next_batch = true; - res = simdjson::SUCCESS_AND_HAS_MORE; - } - } - return res; + }); } + #endif // SIMDJSON_THREADS_ENABLED } // namespace dom diff --git a/include/simdjson/internal/dom_parser_implementation.h b/include/simdjson/internal/dom_parser_implementation.h index 6ff98168..bbcc26b6 100644 --- a/include/simdjson/internal/dom_parser_implementation.h +++ b/include/simdjson/internal/dom_parser_implementation.h @@ -72,16 +72,14 @@ public: * * Stage 2 of the document parser for parser::parse_many. * - * Guaranteed only to be called after stage1(), with buf and len being a subset of the total stage1 buf/len. + * Guaranteed only to be called after stage1(). * Overridden by each implementation. * - * @param buf The json document to parse. - * @param len The length of the json document. * @param doc The document to output to. * @param next_json The next structural index. Start this at 0 the first time, and it will be updated to the next value to pass each time. * @return The error code, SUCCESS if there was no error, or SUCCESS_AND_HAS_MORE if there was no error and stage2 can be called again. */ - WARN_UNUSED virtual error_code stage2(const uint8_t *buf, size_t len, dom::document &doc, size_t &next_json) noexcept = 0; + WARN_UNUSED virtual error_code stage2_next(dom::document &doc) noexcept = 0; /** * Change the capacity of this parser. @@ -117,6 +115,8 @@ public: uint32_t n_structural_indexes{0}; /** Structural indices passed from stage 1 to stage 2 */ std::unique_ptr structural_indexes{}; + /** Next structural index to parse */ + uint32_t next_structural_index{0}; /** * The largest document this parser can support without reallocating. diff --git a/src/generic/dom_parser_implementation.h b/src/generic/dom_parser_implementation.h index 4c8ec598..ca45b98e 100644 --- a/src/generic/dom_parser_implementation.h +++ b/src/generic/dom_parser_implementation.h @@ -28,11 +28,12 @@ public: really_inline dom_parser_implementation(); dom_parser_implementation(const dom_parser_implementation &) = delete; dom_parser_implementation & operator=(const dom_parser_implementation &) = delete; - + WARN_UNUSED error_code parse(const uint8_t *buf, size_t len, dom::document &doc) noexcept final; - WARN_UNUSED error_code stage1(const uint8_t *buf, size_t len, bool streaming) noexcept final; + WARN_UNUSED error_code stage1(const uint8_t *buf, size_t len, bool partial) noexcept final; + WARN_UNUSED error_code check_for_unclosed_array() noexcept; WARN_UNUSED error_code stage2(dom::document &doc) noexcept final; - WARN_UNUSED error_code stage2(const uint8_t *buf, size_t len, dom::document &doc, size_t &next_json) noexcept final; + WARN_UNUSED error_code stage2_next(dom::document &doc) noexcept final; WARN_UNUSED error_code set_capacity(size_t capacity) noexcept final; WARN_UNUSED error_code set_max_depth(size_t max_depth) noexcept final; }; diff --git a/src/generic/stage1/allocate.h b/src/generic/stage1/allocate.h index a7ec382b..ab6fdb8a 100644 --- a/src/generic/stage1/allocate.h +++ b/src/generic/stage1/allocate.h @@ -8,6 +8,8 @@ really_inline error_code set_capacity(internal::dom_parser_implementation &parse size_t max_structures = ROUNDUP_N(capacity, 64) + 2 + 7; parser.structural_indexes.reset( new (std::nothrow) uint32_t[max_structures] ); if (!parser.structural_indexes) { return MEMALLOC; } + parser.structural_indexes[0] = 0; + parser.n_structural_indexes = 0; return SUCCESS; } diff --git a/src/generic/stage1/buf_block_reader.h b/src/generic/stage1/buf_block_reader.h index d67e73d1..af47933c 100644 --- a/src/generic/stage1/buf_block_reader.h +++ b/src/generic/stage1/buf_block_reader.h @@ -24,6 +24,38 @@ private: size_t idx; }; +constexpr const int TITLE_SIZE = 12; + +// Routines to print masks and text for debugging bitmask operations +UNUSED static char * format_input_text_64(const uint8_t *text) { + static char *buf = (char*)malloc(sizeof(simd8x64) + 1); + for (size_t i=0; i); i++) { + buf[i] = int8_t(text[i]) < ' ' ? '_' : int8_t(text[i]); + } + buf[sizeof(simd8x64)] = '\0'; + return buf; +} + +// Routines to print masks and text for debugging bitmask operations +UNUSED static char * format_input_text(const simd8x64 in) { + static char *buf = (char*)malloc(sizeof(simd8x64) + 1); + in.store((uint8_t*)buf); + for (size_t i=0; i); i++) { + if (buf[i] < ' ') { buf[i] = '_'; } + } + buf[sizeof(simd8x64)] = '\0'; + return buf; +} + +UNUSED static char * format_mask(uint64_t mask) { + static char *buf = (char*)malloc(64 + 1); + for (size_t i=0; i<64; i++) { + buf[i] = (mask & (size_t(1) << i)) ? 'X' : ' '; + } + buf[64] = '\0'; + return buf; +} + template really_inline buf_block_reader::buf_block_reader(const uint8_t *_buf, size_t _len) : buf{_buf}, len{_len}, lenminusstep{len < STEP_SIZE ? 0 : len - STEP_SIZE}, idx{0} {} @@ -51,23 +83,3 @@ template really_inline void buf_block_reader::advance() { idx += STEP_SIZE; } - -// Routines to print masks and text for debugging bitmask operations -UNUSED static char * format_input_text(const simd8x64 in) { - static char *buf = (char*)malloc(sizeof(simd8x64) + 1); - in.store((uint8_t*)buf); - for (size_t i=0; i); i++) { - if (buf[i] < ' ') { buf[i] = '_'; } - } - buf[sizeof(simd8x64)] = '\0'; - return buf; -} - -UNUSED static char * format_mask(uint64_t mask) { - static char *buf = (char*)malloc(64 + 1); - for (size_t i=0; i<64; i++) { - buf[i] = (mask & (size_t(1) << i)) ? 'X' : ' '; - } - buf[64] = '\0'; - return buf; -} diff --git a/src/generic/stage1/json_structural_indexer.h b/src/generic/stage1/json_structural_indexer.h index e128f809..0cb2c9f3 100644 --- a/src/generic/stage1/json_structural_indexer.h +++ b/src/generic/stage1/json_structural_indexer.h @@ -60,19 +60,21 @@ public: /** * Find the important bits of JSON in a 128-byte chunk, and add them to structural_indexes. * - * @param streaming Setting the streaming parameter to true allows the find_structural_bits to + * @param partial Setting the partial parameter to true allows the find_structural_bits to * tolerate unclosed strings. The caller should still ensure that the input is valid UTF-8. If * you are processing substrings, you may want to call on a function like trimmed_length_safe_utf8. */ template - static error_code index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept; + static error_code index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool partial) noexcept; private: really_inline json_structural_indexer(uint32_t *structural_indexes); template really_inline void step(const uint8_t *block, buf_block_reader &reader) noexcept; really_inline void next(simd::simd8x64 in, json_block block, size_t idx); - really_inline error_code finish(dom_parser_implementation &parser, size_t idx, size_t len, bool streaming); + really_inline error_code finish(dom_parser_implementation &parser, size_t idx, size_t len, bool partial); + static really_inline uint32_t find_next_document_index(dom_parser_implementation &parser); + static really_inline size_t trim_partial_utf8(const uint8_t *buf, size_t len); json_scanner scanner{}; utf8_checker checker{}; @@ -101,8 +103,9 @@ really_inline json_structural_indexer::json_structural_indexer(uint32_t *structu // workout. // template -error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept { +error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool partial) noexcept { if (unlikely(len > parser.capacity())) { return CAPACITY; } + if (partial) { len = trim_partial_utf8(buf, len); } buf_block_reader reader(buf, len); json_structural_indexer indexer(parser.structural_indexes.get()); @@ -117,7 +120,7 @@ error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_pa if (unlikely(reader.get_remainder(block) == 0)) { return EMPTY; } indexer.step(block, reader); - return indexer.finish(parser, reader.block_index(), len, streaming); + return indexer.finish(parser, reader.block_index(), len, partial); } template<> @@ -147,11 +150,11 @@ really_inline void json_structural_indexer::next(simd::simd8x64 in, jso unescaped_chars_error |= block.non_quote_inside_string(unescaped); } -really_inline error_code json_structural_indexer::finish(dom_parser_implementation &parser, size_t idx, size_t len, bool streaming) { +really_inline error_code json_structural_indexer::finish(dom_parser_implementation &parser, size_t idx, size_t len, bool partial) { // Write out the final iteration's structurals indexer.write(uint32_t(idx-64), prev_structurals); - error_code error = scanner.finish(streaming); + error_code error = scanner.finish(partial); if (unlikely(error != SUCCESS)) { return error; } if (unescaped_chars_error) { @@ -159,22 +162,13 @@ really_inline error_code json_structural_indexer::finish(dom_parser_implementati } parser.n_structural_indexes = uint32_t(indexer.tail - parser.structural_indexes.get()); - /* a valid JSON file cannot have zero structural indexes - we should have - * found something */ + // a valid JSON file cannot have zero structural indexes - we should have found something if (unlikely(parser.n_structural_indexes == 0u)) { return EMPTY; } if (unlikely(parser.structural_indexes[parser.n_structural_indexes - 1] > len)) { return UNEXPECTED_ERROR; } - if (len != parser.structural_indexes[parser.n_structural_indexes - 1]) { - /* the string might not be NULL terminated, but we add a virtual NULL - * ending character. */ - parser.structural_indexes[parser.n_structural_indexes++] = uint32_t(len); - } - /* make it safe to dereference one beyond this array */ - parser.structural_indexes[parser.n_structural_indexes] = uint32_t(len); - parser.structural_indexes[parser.n_structural_indexes + 1] = 0; /*** * This is related to https://github.com/simdjson/simdjson/issues/906 * Basically, we want to make sure that if the parsing continues beyond the last (valid) @@ -189,7 +183,105 @@ really_inline error_code json_structural_indexer::finish(dom_parser_implementati * starts with [, it should end with ]. If we enforce that rule, then we would get * ][[ which is invalid. **/ + parser.structural_indexes[parser.n_structural_indexes] = uint32_t(len); + parser.structural_indexes[parser.n_structural_indexes + 1] = uint32_t(len); + parser.structural_indexes[parser.n_structural_indexes + 2] = 0; + if (partial) { + auto new_structural_indexes = find_next_document_index(parser); + if (new_structural_indexes == 0 && parser.n_structural_indexes > 0) { + return CAPACITY; // If the buffer is partial but the document is incomplete, it's too big to parse. + } + parser.n_structural_indexes = new_structural_indexes; + } + parser.next_structural_index = 0; return checker.errors(); } +/** + * This algorithm is used to quickly identify the last structural position that + * makes up a complete document. + * + * It does this by going backwards and finding the last *document boundary* (a + * place where one value follows another without a comma between them). If the + * last document (the characters after the boundary) has an equal number of + * start and end brackets, it is considered complete. + * + * Simply put, we iterate over the structural characters, starting from + * the end. We consider that we found the end of a JSON document when the + * first element of the pair is NOT one of these characters: '{' '[' ';' ',' + * and when the second element is NOT one of these characters: '}' '}' ';' ','. + * + * This simple comparison works most of the time, but it does not cover cases + * where the batch's structural indexes contain a perfect amount of documents. + * In such a case, we do not have access to the structural index which follows + * the last document, therefore, we do not have access to the second element in + * the pair, and means that we cannot identify the last document. To fix this + * issue, we keep a count of the open and closed curly/square braces we found + * while searching for the pair. When we find a pair AND the count of open and + * closed curly/square braces is the same, we know that we just passed a + * complete + * document, therefore the last json buffer location is the end of the batch + */ +really_inline uint32_t json_structural_indexer::find_next_document_index(dom_parser_implementation &parser) { + // TODO don't count separately, just figure out depth + auto arr_cnt = 0; + auto obj_cnt = 0; + for (auto i = parser.n_structural_indexes - 1; i > 0; i--) { + auto idxb = parser.structural_indexes[i]; + switch (parser.buf[idxb]) { + case ':': + case ',': + continue; + case '}': + obj_cnt--; + continue; + case ']': + arr_cnt--; + continue; + case '{': + obj_cnt++; + break; + case '[': + arr_cnt++; + break; + } + auto idxa = parser.structural_indexes[i - 1]; + switch (parser.buf[idxa]) { + case '{': + case '[': + case ':': + case ',': + continue; + } + // Last document is complete, so the next document will appear after! + if (!arr_cnt && !obj_cnt) { + return parser.n_structural_indexes; + } + // Last document is incomplete; mark the document at i + 1 as the next one + return i; + } + return 0; +} + +// Skip the last character if it is partial +really_inline size_t json_structural_indexer::trim_partial_utf8(const uint8_t *buf, size_t len) { + if (unlikely(len < 3)) { + switch (len) { + case 2: + if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left + if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 2 bytes left + return len; + case 1: + if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left + return len; + case 0: + return len; + } + } + if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left + if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 1 byte left + if (buf[len-3] >= 0b11110000) { return len-3; } // 4-byte characters with only 3 bytes left + return len; +} + } // namespace stage1 diff --git a/src/generic/stage2/logger.h b/src/generic/stage2/logger.h index bd2eac50..85f2444a 100644 --- a/src/generic/stage2/logger.h +++ b/src/generic/stage2/logger.h @@ -56,7 +56,7 @@ namespace logger { } printf("| %c ", printable_char(structurals.at_beginning() ? ' ' : structurals.current_char())); printf("| %c ", printable_char(structurals.peek_char())); - printf("| %5zd ", structurals.next_structural); + printf("| %5u ", structurals.structural_indexes[structurals.next_structural]); printf("| %-*s ", LOG_DETAIL_LEN, detail); printf("| %*zu ", LOG_INDEX_LEN, structurals.idx); printf("|\n"); diff --git a/src/generic/stage2/streaming_structural_parser.h b/src/generic/stage2/streaming_structural_parser.h index b83e116d..8e63d028 100755 --- a/src/generic/stage2/streaming_structural_parser.h +++ b/src/generic/stage2/streaming_structural_parser.h @@ -1,12 +1,18 @@ namespace stage2 { struct streaming_structural_parser: structural_parser { - really_inline streaming_structural_parser(dom_parser_implementation &_parser, uint32_t next_structural) : structural_parser(_parser, next_structural) {} + really_inline streaming_structural_parser(dom_parser_implementation &_parser) : structural_parser(_parser, _parser.next_structural_index) {} // override to add streaming WARN_UNUSED really_inline error_code start(ret_address_t finish_parser) { + // If there are no structurals left, return EMPTY + if (structurals.at_end(parser.n_structural_indexes)) { + return parser.error = EMPTY; + } + log_start(); - init(); // sets is_valid to false + init(); + // Capacity ain't no thang for streaming, so we don't check it. // Advance to the first character as soon as possible advance_char(); @@ -24,6 +30,7 @@ struct streaming_structural_parser: structural_parser { return parser.error = TAPE_ERROR; } end_document(); + parser.next_structural_index = uint32_t(structurals.next_structural_index()); if (depth != 0) { log_error("Unclosed objects or arrays!"); return parser.error = TAPE_ERROR; @@ -32,9 +39,7 @@ struct streaming_structural_parser: structural_parser { log_error("IMPOSSIBLE: root scope tape index did not start at 0!"); return parser.error = TAPE_ERROR; } - bool finished = structurals.at_end(parser.n_structural_indexes); - if (!finished) { log_value("(and has more)"); } - return finished ? SUCCESS : SUCCESS_AND_HAS_MORE; + return SUCCESS; } }; @@ -44,12 +49,10 @@ struct streaming_structural_parser: structural_parser { * The JSON is parsed to a tape, see the accompanying tape.md file * for documentation. ***********/ -WARN_UNUSED error_code dom_parser_implementation::stage2(const uint8_t *_buf, size_t _len, dom::document &_doc, size_t &next_json) noexcept { - this->buf = _buf; - this->len = _len; +WARN_UNUSED error_code dom_parser_implementation::stage2_next(dom::document &_doc) noexcept { this->doc = &_doc; static constexpr stage2::unified_machine_addresses addresses = INIT_ADDRESSES(); - stage2::streaming_structural_parser parser(*this, uint32_t(next_json)); + stage2::streaming_structural_parser parser(*this); error_code result = parser.start(addresses.finish); if (result) { return result; } // @@ -158,7 +161,6 @@ array_continue: } finish: - next_json = parser.structurals.next_structural_index(); return parser.finish(); error: diff --git a/src/generic/stage2/structural_iterator.h b/src/generic/stage2/structural_iterator.h index cb5763b1..92a990b2 100644 --- a/src/generic/stage2/structural_iterator.h +++ b/src/generic/stage2/structural_iterator.h @@ -52,10 +52,10 @@ public: return result; } really_inline bool past_end(uint32_t n_structural_indexes) { - return next_structural+1 > n_structural_indexes; + return next_structural > n_structural_indexes; } really_inline bool at_end(uint32_t n_structural_indexes) { - return next_structural+1 == n_structural_indexes; + return next_structural == n_structural_indexes; } really_inline bool at_beginning() { return next_structural == 0; diff --git a/src/generic/stage2/structural_parser.h b/src/generic/stage2/structural_parser.h index 80abcc46..ce588fa9 100644 --- a/src/generic/stage2/structural_parser.h +++ b/src/generic/stage2/structural_parser.h @@ -75,10 +75,7 @@ struct structural_parser { uint8_t *current_string_buf_loc{}; uint32_t depth; - really_inline structural_parser( - dom_parser_implementation &_parser, - uint32_t next_structural = 0 - ) : structurals(_parser.buf, _parser.len, _parser.structural_indexes.get(), next_structural), parser{_parser}, depth{0} {} + really_inline structural_parser(dom_parser_implementation &_parser, uint32_t next_structural = 0) : structurals(_parser.buf, _parser.len, _parser.structural_indexes.get(), next_structural), parser{_parser}, depth{0} {} WARN_UNUSED really_inline bool start_scope(ret_address_t continue_state) { parser.containing_scope[depth].tape_index = parser.current_loc; @@ -333,7 +330,7 @@ struct structural_parser { WARN_UNUSED really_inline error_code start(size_t len, ret_address_t finish_state) { log_start(); - init(); // sets is_valid to false + init(); if (len > parser.capacity()) { return parser.error = CAPACITY; } @@ -401,7 +398,7 @@ WARN_UNUSED error_code dom_parser_implementation::stage2(dom::document &_doc) no FAIL_IF( parser.start_array(addresses.finish) ); // Make sure the outer array is closed before continuing; otherwise, there are ways we could get // into memory corruption. See https://github.com/simdjson/simdjson/issues/906 - if (buf[structural_indexes[n_structural_indexes - 2]] != ']') { + if (buf[structural_indexes[n_structural_indexes - 1]] != ']') { goto error; } goto array_begin; diff --git a/tests/errortests.cpp b/tests/errortests.cpp index 80f5823c..76257626 100644 --- a/tests/errortests.cpp +++ b/tests/errortests.cpp @@ -24,7 +24,7 @@ const char *TWITTER_JSON = SIMDJSON_BENCHMARK_DATA_DIR "twitter.json"; #define TEST_FAIL(MESSAGE) { cerr << "FAIL: " << (MESSAGE) << endl; return false; } #define TEST_SUCCEED() { return true; } namespace parser_load { - const char * NONEXISTENT_FILE = "this_file_does_not_exit.json"; + const char * NONEXISTENT_FILE = "this_file_does_not_exist.json"; bool parser_load_capacity() { TEST_START(); dom::parser parser(1); // 1 byte max capacity From c4a0fe1606610048aeefb5941af806f4b0d284cd Mon Sep 17 00:00:00 2001 From: John Keiser Date: Thu, 4 Jun 2020 17:40:15 -0700 Subject: [PATCH 4/8] Add tests for parse_many() errors --- tests/basictests.cpp | 57 +++++++++++++++++++++++++++----------- tests/errortests.cpp | 66 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 104 insertions(+), 19 deletions(-) diff --git a/tests/basictests.cpp b/tests/basictests.cpp index cedbda4c..b3312e49 100644 --- a/tests/basictests.cpp +++ b/tests/basictests.cpp @@ -515,9 +515,9 @@ namespace parse_api_tests { using namespace simdjson; using namespace simdjson::dom; - const padded_string BASIC_JSON = string("[1,2,3]"); - const padded_string BASIC_NDJSON = string("[1,2,3]\n[4,5,6]"); - // const padded_string EMPTY_NDJSON = string(""); + const padded_string BASIC_JSON = "[1,2,3]"_padded; + const padded_string BASIC_NDJSON = "[1,2,3]\n[4,5,6]"_padded; + const padded_string EMPTY_NDJSON = ""_padded; bool parser_parse() { std::cout << "Running " << __func__ << std::endl; @@ -532,24 +532,48 @@ namespace parse_api_tests { dom::parser parser; int count = 0; for (auto [doc, error] : parser.parse_many(BASIC_NDJSON)) { - if (error) { cerr << error << endl; return false; } + if (error) { cerr << "Error in parse_many: " << endl; return false; } if (!doc.is()) { cerr << "Document did not parse as an array" << endl; return false; } count++; } if (count != 2) { cerr << "parse_many returned " << count << " documents, expected 2" << endl; return false; } return true; } - // bool parser_parse_many_empty() { - // std::cout << "Running " << __func__ << std::endl; - // dom::parser parser; - // int count = 0; - // for (auto [doc, error] : parser.parse_many(EMPTY_NDJSON)) { - // if (error) { cerr << error << endl; return false; } - // count++; - // } - // if (count != 0) { cerr << "parse_many returned " << count << " documents, expected 0" << endl; return false; } - // return true; - // } + bool parser_parse_many_empty() { + std::cout << "Running " << __func__ << std::endl; + dom::parser parser; + int count = 0; + for (auto doc : parser.parse_many(EMPTY_NDJSON)) { + if (doc.error()) { cerr << "Error in parse_many: " << doc.error() << endl; return false; } + count++; + } + if (count != 0) { cerr << "parse_many returned " << count << " documents, expected 0" << endl; return false; } + return true; + } + + bool parser_parse_many_empty_batches() { + std::cout << "Running " << __func__ << std::endl; + dom::parser parser; + uint64_t count = 0; + constexpr const int BATCH_SIZE = 128; + uint8_t empty_batches_ndjson[BATCH_SIZE*16+SIMDJSON_PADDING]; + memset(&empty_batches_ndjson[0], ' ', BATCH_SIZE*16+SIMDJSON_PADDING); + memcpy(&empty_batches_ndjson[BATCH_SIZE*3+2], "1", 1); + memcpy(&empty_batches_ndjson[BATCH_SIZE*10+4], "2", 1); + memcpy(&empty_batches_ndjson[BATCH_SIZE*11+6], "3", 1); + for (int i=0; i<16; i++) { + printf("| %.*s |", BATCH_SIZE, &empty_batches_ndjson[BATCH_SIZE*i]); + } + for (auto [doc, error] : parser.parse_many(empty_batches_ndjson, BATCH_SIZE*16)) { + if (error) { cerr << "Error in parse_many: " << error << endl; return false; } + count++; + auto [val, val_error] = doc.get(); + if (val_error) { cerr << "Document is not an unsigned int: " << val_error << endl; return false; } + if (val != count) { cerr << "Expected document #" << count << " to equal " << count << ", but got " << val << " instead!" << endl; return false; } + } + if (count != 3) { cerr << "parse_many returned " << count << " documents, expected 0" << endl; return false; } + return true; + } bool parser_load() { std::cout << "Running " << __func__ << " on " << TWITTER_JSON << std::endl; @@ -633,7 +657,8 @@ namespace parse_api_tests { bool run() { return parser_parse() && parser_parse_many() && -// parser_parse_many_empty() && + parser_parse_many_empty() && + parser_parse_many_empty_batches() && parser_load() && parser_load_many() && #if SIMDJSON_EXCEPTIONS diff --git a/tests/errortests.cpp b/tests/errortests.cpp index 76257626..27f9bfc8 100644 --- a/tests/errortests.cpp +++ b/tests/errortests.cpp @@ -42,6 +42,57 @@ namespace parser_load { TEST_FAIL("No documents returned"); } + bool parser_parse_many_documents_error_in_the_middle() { + TEST_START(); + const padded_string DOC = "1 2 [} 3"_padded; + size_t count = 0; + dom::parser parser; + for (auto doc : parser.parse_many(DOC)) { + count++; + auto [val, error] = doc.get(); + if (count == 3) { + ASSERT_ERROR(error, TAPE_ERROR); + } else { + if (error) { TEST_FAIL(error); } + if (val != count) { cerr << "FAIL: expected " << count << ", got " << val << endl; return false; } + } + } + if (count != 3) { cerr << "FAIL: expected 2 documents and 1 error, got " << count << " total things" << endl; return false; } + TEST_SUCCEED(); + } + + bool parser_parse_many_documents_partial() { + TEST_START(); + const padded_string DOC = "["_padded; + size_t count = 0; + dom::parser parser; + for (auto doc : parser.parse_many(DOC)) { + count++; + ASSERT_ERROR(doc.error(), TAPE_ERROR); + } + if (count != 1) { cerr << "FAIL: expected no documents and 1 error, got " << count << " total things" << endl; return false; } + TEST_SUCCEED(); + } + + bool parser_parse_many_documents_partial_at_the_end() { + TEST_START(); + const padded_string DOC = "1 2 ["_padded; + size_t count = 0; + dom::parser parser; + for (auto doc : parser.parse_many(DOC)) { + count++; + auto [val, error] = doc.get(); + if (count == 3) { + ASSERT_ERROR(error, TAPE_ERROR); + } else { + if (error) { TEST_FAIL(error); } + if (val != count) { cerr << "FAIL: expected " << count << ", got " << val << endl; return false; } + } + } + if (count != 3) { cerr << "FAIL: expected 2 documents and 1 error, got " << count << " total things" << endl; return false; } + TEST_SUCCEED(); + } + bool parser_load_nonexistent() { TEST_START(); dom::parser parser; @@ -83,9 +134,18 @@ namespace parser_load { TEST_FAIL("No documents returned"); } bool run() { - return parser_load_capacity() && parser_load_many_capacity() - && parser_load_nonexistent() && parser_load_many_nonexistent() && padded_string_load_nonexistent() - && parser_load_chain() && parser_load_many_chain(); + return true + && parser_load_capacity() + && parser_load_many_capacity() + && parser_load_nonexistent() + && parser_load_many_nonexistent() + && padded_string_load_nonexistent() + && parser_load_chain() + && parser_load_many_chain() + && parser_parse_many_documents_error_in_the_middle() + && parser_parse_many_documents_partial() + && parser_parse_many_documents_partial_at_the_end() + ; } } From 3e226795f062268c45f8deb4256a78795fa8147c Mon Sep 17 00:00:00 2001 From: John Keiser Date: Thu, 4 Jun 2020 18:12:43 -0700 Subject: [PATCH 5/8] Run all passing json against parse_many. Empty documents pass, too. --- tests/parse_many_test.cpp | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/tests/parse_many_test.cpp b/tests/parse_many_test.cpp index 2353eb04..66fba3a4 100644 --- a/tests/parse_many_test.cpp +++ b/tests/parse_many_test.cpp @@ -83,17 +83,21 @@ bool validate(const char *dirname) { if (contains("EXCLUDE", name)) { // skipping how_many--; - } else if (starts_with("pass", name) and (has_extension(extension1, name) or has_extension(extension2, name)) and error) { - is_file_as_expected[i] = false; - printf("warning: file %s should pass but it fails. Error is: %s\n", - name, error_message(error)); - printf("size of file in bytes: %zu \n", json.size()); - everything_fine = false; - } else if ( starts_with("fail", name) and (not starts_with("fail10.json", name)) and !error) { - is_file_as_expected[i] = false; - printf("warning: file %s should fail but it passes.\n", name); - printf("size of file in bytes: %zu \n", json.size()); - everything_fine = false; + } else if (starts_with("pass", name) or starts_with("fail10.json", name) or starts_with("fail70.json", name)) { + if (error) { + is_file_as_expected[i] = false; + printf("warning: file %s should pass but it fails. Error is: %s\n", + name, error_message(error)); + printf("size of file in bytes: %zu \n", json.size()); + everything_fine = false; + } + } else if ( starts_with("fail", name) ) { + if (!error) { + is_file_as_expected[i] = false; + printf("warning: file %s should fail but it passes.\n", name); + printf("size of file in bytes: %zu \n", json.size()); + everything_fine = false; + } } free(fullpath); } From d43a4e9df98acf2706279b9f88d597e7baf5e40d Mon Sep 17 00:00:00 2001 From: John Keiser Date: Thu, 4 Jun 2020 18:28:24 -0700 Subject: [PATCH 6/8] Remove SUCCESS_AND_HAS_MORE (internal only value) --- include/simdjson/error.h | 1 - include/simdjson/internal/dom_parser_implementation.h | 3 +-- src/error.cpp | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/include/simdjson/error.h b/include/simdjson/error.h index 1ff54891..0fc7da8b 100644 --- a/include/simdjson/error.h +++ b/include/simdjson/error.h @@ -11,7 +11,6 @@ namespace simdjson { */ enum error_code { SUCCESS = 0, ///< No error - SUCCESS_AND_HAS_MORE, ///< @private No error and buffer still has more data CAPACITY, ///< This parser can't support a document that big MEMALLOC, ///< Error allocating memory, most likely out of memory TAPE_ERROR, ///< Something went wrong while writing to the tape (stage 2), this is a generic error diff --git a/include/simdjson/internal/dom_parser_implementation.h b/include/simdjson/internal/dom_parser_implementation.h index bbcc26b6..4bf16d48 100644 --- a/include/simdjson/internal/dom_parser_implementation.h +++ b/include/simdjson/internal/dom_parser_implementation.h @@ -76,8 +76,7 @@ public: * Overridden by each implementation. * * @param doc The document to output to. - * @param next_json The next structural index. Start this at 0 the first time, and it will be updated to the next value to pass each time. - * @return The error code, SUCCESS if there was no error, or SUCCESS_AND_HAS_MORE if there was no error and stage2 can be called again. + * @return The error code, SUCCESS if there was no error, or EMPTY if all documents have been parsed. */ WARN_UNUSED virtual error_code stage2_next(dom::document &doc) noexcept = 0; diff --git a/src/error.cpp b/src/error.cpp index 7b857054..60fb2339 100644 --- a/src/error.cpp +++ b/src/error.cpp @@ -5,7 +5,6 @@ namespace internal { SIMDJSON_DLLIMPORTEXPORT const error_code_info error_codes[] { { SUCCESS, "No error" }, - { SUCCESS_AND_HAS_MORE, "No error and buffer still has more data" }, { CAPACITY, "This parser can't support a document that big" }, { MEMALLOC, "Error allocating memory, we're most likely out of memory" }, { TAPE_ERROR, "The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc." }, From fe01da077efbb566a0278781be4214c7b48ffcbb Mon Sep 17 00:00:00 2001 From: John Keiser Date: Thu, 4 Jun 2020 19:01:17 -0700 Subject: [PATCH 7/8] Make threaded version work again --- include/simdjson/dom/document_stream.h | 13 ++++++------ include/simdjson/inline/document_stream.h | 25 ++++++++++------------- tests/basictests.cpp | 3 --- 3 files changed, 17 insertions(+), 24 deletions(-) diff --git a/include/simdjson/dom/document_stream.h b/include/simdjson/dom/document_stream.h index b1ee1a9d..1d7ba43a 100644 --- a/include/simdjson/dom/document_stream.h +++ b/include/simdjson/dom/document_stream.h @@ -120,7 +120,7 @@ private: inline size_t next_batch_start() const noexcept; /** Pass the next batch through stage 1 with the given parser. */ - inline void run_stage1(dom::parser &p, size_t batch_start) noexcept; + inline error_code run_stage1(dom::parser &p, size_t batch_start) noexcept; dom::parser &parser; const uint8_t *buf; @@ -131,20 +131,19 @@ private: error_code error; #ifdef SIMDJSON_THREADS_ENABLED - /** - * Start a thread to run stage 1 on the next batch. - */ + inline void load_from_stage1_thread() noexcept; + + /** Start a thread to run stage 1 on the next batch. */ inline void start_stage1_thread() noexcept; - /** - * Wait for the stage 1 thread to finish and capture the results. - */ + /** Wait for the stage 1 thread to finish and capture the results. */ inline void finish_stage1_thread() noexcept; /** The error returned from the stage 1 thread. */ error_code stage1_thread_error{UNINITIALIZED}; /** The thread used to run stage 1 against the next batch in the background. */ std::thread stage1_thread{}; + /** * The parser used to run stage 1 in the background. Will be swapped * with the regular parser when finished. diff --git a/include/simdjson/inline/document_stream.h b/include/simdjson/inline/document_stream.h index 97a77da5..7abf6145 100644 --- a/include/simdjson/inline/document_stream.h +++ b/include/simdjson/inline/document_stream.h @@ -72,13 +72,13 @@ inline void document_stream::start() noexcept { // Always run the first stage 1 parse immediately batch_start = 0; - run_stage1(parser, batch_start); + error = run_stage1(parser, batch_start); if (error) { return; } #ifdef SIMDJSON_THREADS_ENABLED if (next_batch_start() < len) { // Kick off the first thread if needed - error = thread_parser.ensure_capacity(batch_size); + error = stage1_thread_parser.ensure_capacity(batch_size); if (error) { return; } start_stage1_thread(); if (error) { return; } @@ -102,7 +102,7 @@ inline void document_stream::next() noexcept { #ifdef SIMDJSON_THREADS_ENABLED load_from_stage1_thread(); #else - run_stage1(parser, batch_start); + error = run_stage1(parser, batch_start); #endif if (error) { continue; } // If the error was EMPTY, we may want to load another batch. @@ -115,13 +115,13 @@ inline size_t document_stream::next_batch_start() const noexcept { return batch_start + parser.implementation->structural_indexes[parser.implementation->n_structural_indexes]; } -inline void document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept { +inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept { // If this is the final batch, pass partial = false size_t remaining = len - _batch_start; if (remaining <= batch_size) { - error = p.implementation->stage1(&buf[_batch_start], remaining, false); + return p.implementation->stage1(&buf[_batch_start], remaining, false); } else { - error = p.implementation->stage1(&buf[_batch_start], batch_size, true); + return p.implementation->stage1(&buf[_batch_start], batch_size, true); } } @@ -132,10 +132,9 @@ inline void document_stream::load_from_stage1_thread() noexcept { // Swap to the parser that was loaded up in the thread. Make sure the parser has // enough memory to swap to, as well. - error = parser.ensure_capacity(batch_size); - if (error) { return error; } std::swap(parser, stage1_thread_parser); - if (stage1_thread_error) { return stage1_thread_error; } + error = stage1_thread_error; + if (error) { return; } // If there's anything left, start the stage 1 thread! if (next_batch_start() < len) { @@ -149,11 +148,9 @@ inline void document_stream::start_stage1_thread() noexcept { // there is only one thread that may write to this value // TODO this is NOT exception-safe. this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error - stage1_thread = std::thread([this] { - this->stage1_thread_error = this->thread_parser.ensure_capacity(this->batch_size); - if (!this->stage1_thread_error) { - this->stage1_thread_error = run_stage1(this->stage1_thread_parser, this->next_batch_start()); - } + size_t _next_batch_start = this->next_batch_start(); + stage1_thread = std::thread([this, _next_batch_start] { + this->stage1_thread_error = run_stage1(this->stage1_thread_parser, _next_batch_start); }); } diff --git a/tests/basictests.cpp b/tests/basictests.cpp index b3312e49..537ead0d 100644 --- a/tests/basictests.cpp +++ b/tests/basictests.cpp @@ -561,9 +561,6 @@ namespace parse_api_tests { memcpy(&empty_batches_ndjson[BATCH_SIZE*3+2], "1", 1); memcpy(&empty_batches_ndjson[BATCH_SIZE*10+4], "2", 1); memcpy(&empty_batches_ndjson[BATCH_SIZE*11+6], "3", 1); - for (int i=0; i<16; i++) { - printf("| %.*s |", BATCH_SIZE, &empty_batches_ndjson[BATCH_SIZE*i]); - } for (auto [doc, error] : parser.parse_many(empty_batches_ndjson, BATCH_SIZE*16)) { if (error) { cerr << "Error in parse_many: " << error << endl; return false; } count++; From 0dbda65e4427cfc331ff119127481e1409602292 Mon Sep 17 00:00:00 2001 From: John Keiser Date: Thu, 4 Jun 2020 19:15:35 -0700 Subject: [PATCH 8/8] Fix fallback implementation --- src/arm64/dom_parser_implementation.cpp | 1 + src/fallback/dom_parser_implementation.cpp | 57 +++++++--- src/generic/stage1/find_next_document_index.h | 86 ++++++++++++++ src/generic/stage1/json_structural_indexer.h | 107 ++---------------- src/haswell/dom_parser_implementation.cpp | 1 + src/westmere/dom_parser_implementation.cpp | 1 + 6 files changed, 140 insertions(+), 113 deletions(-) create mode 100644 src/generic/stage1/find_next_document_index.h diff --git a/src/arm64/dom_parser_implementation.cpp b/src/arm64/dom_parser_implementation.cpp index 2e95cd68..9116965b 100644 --- a/src/arm64/dom_parser_implementation.cpp +++ b/src/arm64/dom_parser_implementation.cpp @@ -82,6 +82,7 @@ WARN_UNUSED error_code implementation::minify(const uint8_t *buf, size_t len, ui return arm64::stage1::json_minifier::minify<64>(buf, len, dst, dst_len); } +#include "generic/stage1/find_next_document_index.h" #include "generic/stage1/utf8_lookup2_algorithm.h" #include "generic/stage1/json_structural_indexer.h" WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept { diff --git a/src/fallback/dom_parser_implementation.cpp b/src/fallback/dom_parser_implementation.cpp index e77e5e97..ae0944c8 100644 --- a/src/fallback/dom_parser_implementation.cpp +++ b/src/fallback/dom_parser_implementation.cpp @@ -9,15 +9,17 @@ namespace simdjson { namespace fallback { namespace stage1 { +#include "generic/stage1/find_next_document_index.h" + class structural_scanner { public: -really_inline structural_scanner(dom_parser_implementation &_parser, bool _streaming) +really_inline structural_scanner(dom_parser_implementation &_parser, bool _partial) : buf{_parser.buf}, next_structural_index{_parser.structural_indexes.get()}, parser{_parser}, len{static_cast(_parser.len)}, - streaming{_streaming} { + partial{_partial} { } really_inline void add_structural() { @@ -41,7 +43,12 @@ really_inline void validate_utf8_character() { // 2-byte if ((buf[idx] & 0b00100000) == 0) { // missing continuation - if (unlikely(idx+1 > len || !is_continuation(buf[idx+1]))) { error = UTF8_ERROR; idx++; return; } + if (unlikely(idx+1 > len || !is_continuation(buf[idx+1]))) { + if (idx+1 > len && partial) { idx = len; return; } + error = UTF8_ERROR; + idx++; + return; + } // overlong: 1100000_ 10______ if (buf[idx] <= 0b11000001) { error = UTF8_ERROR; } idx += 2; @@ -51,7 +58,12 @@ really_inline void validate_utf8_character() { // 3-byte if ((buf[idx] & 0b00010000) == 0) { // missing continuation - if (unlikely(idx+2 > len || !is_continuation(buf[idx+1]) || !is_continuation(buf[idx+2]))) { error = UTF8_ERROR; idx++; return; } + if (unlikely(idx+2 > len || !is_continuation(buf[idx+1]) || !is_continuation(buf[idx+2]))) { + if (idx+2 > len && partial) { idx = len; return; } + error = UTF8_ERROR; + idx++; + return; + } // overlong: 11100000 100_____ ________ if (buf[idx] == 0b11100000 && buf[idx+1] <= 0b10011111) { error = UTF8_ERROR; } // surrogates: U+D800-U+DFFF 11101101 101_____ @@ -62,7 +74,12 @@ really_inline void validate_utf8_character() { // 4-byte // missing continuation - if (unlikely(idx+3 > len || !is_continuation(buf[idx+1]) || !is_continuation(buf[idx+2]) || !is_continuation(buf[idx+3]))) { error = UTF8_ERROR; idx++; return; } + if (unlikely(idx+3 > len || !is_continuation(buf[idx+1]) || !is_continuation(buf[idx+2]) || !is_continuation(buf[idx+3]))) { + if (idx+2 > len && partial) { idx = len; return; } + error = UTF8_ERROR; + idx++; + return; + } // overlong: 11110000 1000____ ________ ________ if (buf[idx] == 0b11110000 && buf[idx+1] <= 0b10001111) { error = UTF8_ERROR; } // too large: > U+10FFFF: @@ -87,7 +104,7 @@ really_inline void validate_string() { idx++; } } - if (idx >= len && !streaming) { error = UNCLOSED_STRING; } + if (idx >= len && !partial) { error = UNCLOSED_STRING; } } really_inline bool is_whitespace_or_operator(uint8_t c) { @@ -128,16 +145,26 @@ really_inline error_code scan() { break; } } - if (unlikely(next_structural_index == parser.structural_indexes.get())) { - return EMPTY; - } *next_structural_index = len; - next_structural_index++; // We pad beyond. // https://github.com/simdjson/simdjson/issues/906 - next_structural_index[0] = len; - next_structural_index[1] = 0; + next_structural_index[1] = len; + next_structural_index[2] = 0; parser.n_structural_indexes = uint32_t(next_structural_index - parser.structural_indexes.get()); + parser.next_structural_index = 0; + + if (unlikely(parser.n_structural_indexes == 0)) { + return EMPTY; + } + + if (partial) { + auto new_structural_indexes = find_next_document_index(parser); + if (new_structural_indexes == 0 && parser.n_structural_indexes > 0) { + return CAPACITY; // If the buffer is partial but the document is incomplete, it's too big to parse. + } + parser.n_structural_indexes = new_structural_indexes; + } + return error; } @@ -148,16 +175,16 @@ private: uint32_t len; uint32_t idx{0}; error_code error{SUCCESS}; - bool streaming; + bool partial; }; // structural_scanner } // namespace stage1 -WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept { +WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool partial) noexcept { this->buf = _buf; this->len = _len; - stage1::structural_scanner scanner(*this, streaming); + stage1::structural_scanner scanner(*this, partial); return scanner.scan(); } diff --git a/src/generic/stage1/find_next_document_index.h b/src/generic/stage1/find_next_document_index.h new file mode 100644 index 00000000..302af175 --- /dev/null +++ b/src/generic/stage1/find_next_document_index.h @@ -0,0 +1,86 @@ +/** + * This algorithm is used to quickly identify the last structural position that + * makes up a complete document. + * + * It does this by going backwards and finding the last *document boundary* (a + * place where one value follows another without a comma between them). If the + * last document (the characters after the boundary) has an equal number of + * start and end brackets, it is considered complete. + * + * Simply put, we iterate over the structural characters, starting from + * the end. We consider that we found the end of a JSON document when the + * first element of the pair is NOT one of these characters: '{' '[' ';' ',' + * and when the second element is NOT one of these characters: '}' '}' ';' ','. + * + * This simple comparison works most of the time, but it does not cover cases + * where the batch's structural indexes contain a perfect amount of documents. + * In such a case, we do not have access to the structural index which follows + * the last document, therefore, we do not have access to the second element in + * the pair, and that means we cannot identify the last document. To fix this + * issue, we keep a count of the open and closed curly/square braces we found + * while searching for the pair. When we find a pair AND the count of open and + * closed curly/square braces is the same, we know that we just passed a + * complete document, therefore the last json buffer location is the end of the + * batch. + */ +really_inline static uint32_t find_next_document_index(dom_parser_implementation &parser) { + // TODO don't count separately, just figure out depth + auto arr_cnt = 0; + auto obj_cnt = 0; + for (auto i = parser.n_structural_indexes - 1; i > 0; i--) { + auto idxb = parser.structural_indexes[i]; + switch (parser.buf[idxb]) { + case ':': + case ',': + continue; + case '}': + obj_cnt--; + continue; + case ']': + arr_cnt--; + continue; + case '{': + obj_cnt++; + break; + case '[': + arr_cnt++; + break; + } + auto idxa = parser.structural_indexes[i - 1]; + switch (parser.buf[idxa]) { + case '{': + case '[': + case ':': + case ',': + continue; + } + // Last document is complete, so the next document will appear after! + if (!arr_cnt && !obj_cnt) { + return parser.n_structural_indexes; + } + // Last document is incomplete; mark the document at i + 1 as the next one + return i; + } + return 0; +} + +// Skip the last character if it is partial +really_inline static size_t trim_partial_utf8(const uint8_t *buf, size_t len) { + if (unlikely(len < 3)) { + switch (len) { + case 2: + if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left + if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 2 bytes left + return len; + case 1: + if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left + return len; + case 0: + return len; + } + } + if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left + if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 1 byte left + if (buf[len-3] >= 0b11110000) { return len-3; } // 4-byte characters with only 3 bytes left + return len; +} diff --git a/src/generic/stage1/json_structural_indexer.h b/src/generic/stage1/json_structural_indexer.h index 0cb2c9f3..6f80123e 100644 --- a/src/generic/stage1/json_structural_indexer.h +++ b/src/generic/stage1/json_structural_indexer.h @@ -73,8 +73,6 @@ private: really_inline void step(const uint8_t *block, buf_block_reader &reader) noexcept; really_inline void next(simd::simd8x64 in, json_block block, size_t idx); really_inline error_code finish(dom_parser_implementation &parser, size_t idx, size_t len, bool partial); - static really_inline uint32_t find_next_document_index(dom_parser_implementation &parser); - static really_inline size_t trim_partial_utf8(const uint8_t *buf, size_t len); json_scanner scanner{}; utf8_checker checker{}; @@ -98,7 +96,7 @@ really_inline json_structural_indexer::json_structural_indexer(uint32_t *structu // 3. Step 1 doesn't use enough capacity, so we run some extra stuff while we're waiting for that // to finish: utf-8 checks and generating the output from the last iteration. // -// The reason we run 2 inputs at a time, is steps 2 and 3 are//still* not enough to soak up all +// The reason we run 2 inputs at a time, is steps 2 and 3 are *still* not enough to soak up all // available capacity with just one input. Running 2 at a time seems to give the CPU a good enough // workout. // @@ -162,13 +160,6 @@ really_inline error_code json_structural_indexer::finish(dom_parser_implementati } parser.n_structural_indexes = uint32_t(indexer.tail - parser.structural_indexes.get()); - // a valid JSON file cannot have zero structural indexes - we should have found something - if (unlikely(parser.n_structural_indexes == 0u)) { - return EMPTY; - } - if (unlikely(parser.structural_indexes[parser.n_structural_indexes - 1] > len)) { - return UNEXPECTED_ERROR; - } /*** * This is related to https://github.com/simdjson/simdjson/issues/906 * Basically, we want to make sure that if the parsing continues beyond the last (valid) @@ -186,6 +177,14 @@ really_inline error_code json_structural_indexer::finish(dom_parser_implementati parser.structural_indexes[parser.n_structural_indexes] = uint32_t(len); parser.structural_indexes[parser.n_structural_indexes + 1] = uint32_t(len); parser.structural_indexes[parser.n_structural_indexes + 2] = 0; + parser.next_structural_index = 0; + // a valid JSON file cannot have zero structural indexes - we should have found something + if (unlikely(parser.n_structural_indexes == 0u)) { + return EMPTY; + } + if (unlikely(parser.structural_indexes[parser.n_structural_indexes - 1] > len)) { + return UNEXPECTED_ERROR; + } if (partial) { auto new_structural_indexes = find_next_document_index(parser); if (new_structural_indexes == 0 && parser.n_structural_indexes > 0) { @@ -193,95 +192,7 @@ really_inline error_code json_structural_indexer::finish(dom_parser_implementati } parser.n_structural_indexes = new_structural_indexes; } - parser.next_structural_index = 0; return checker.errors(); } -/** - * This algorithm is used to quickly identify the last structural position that - * makes up a complete document. - * - * It does this by going backwards and finding the last *document boundary* (a - * place where one value follows another without a comma between them). If the - * last document (the characters after the boundary) has an equal number of - * start and end brackets, it is considered complete. - * - * Simply put, we iterate over the structural characters, starting from - * the end. We consider that we found the end of a JSON document when the - * first element of the pair is NOT one of these characters: '{' '[' ';' ',' - * and when the second element is NOT one of these characters: '}' '}' ';' ','. - * - * This simple comparison works most of the time, but it does not cover cases - * where the batch's structural indexes contain a perfect amount of documents. - * In such a case, we do not have access to the structural index which follows - * the last document, therefore, we do not have access to the second element in - * the pair, and means that we cannot identify the last document. To fix this - * issue, we keep a count of the open and closed curly/square braces we found - * while searching for the pair. When we find a pair AND the count of open and - * closed curly/square braces is the same, we know that we just passed a - * complete - * document, therefore the last json buffer location is the end of the batch - */ -really_inline uint32_t json_structural_indexer::find_next_document_index(dom_parser_implementation &parser) { - // TODO don't count separately, just figure out depth - auto arr_cnt = 0; - auto obj_cnt = 0; - for (auto i = parser.n_structural_indexes - 1; i > 0; i--) { - auto idxb = parser.structural_indexes[i]; - switch (parser.buf[idxb]) { - case ':': - case ',': - continue; - case '}': - obj_cnt--; - continue; - case ']': - arr_cnt--; - continue; - case '{': - obj_cnt++; - break; - case '[': - arr_cnt++; - break; - } - auto idxa = parser.structural_indexes[i - 1]; - switch (parser.buf[idxa]) { - case '{': - case '[': - case ':': - case ',': - continue; - } - // Last document is complete, so the next document will appear after! - if (!arr_cnt && !obj_cnt) { - return parser.n_structural_indexes; - } - // Last document is incomplete; mark the document at i + 1 as the next one - return i; - } - return 0; -} - -// Skip the last character if it is partial -really_inline size_t json_structural_indexer::trim_partial_utf8(const uint8_t *buf, size_t len) { - if (unlikely(len < 3)) { - switch (len) { - case 2: - if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left - if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 2 bytes left - return len; - case 1: - if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left - return len; - case 0: - return len; - } - } - if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left - if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 1 byte left - if (buf[len-3] >= 0b11110000) { return len-3; } // 4-byte characters with only 3 bytes left - return len; -} - } // namespace stage1 diff --git a/src/haswell/dom_parser_implementation.cpp b/src/haswell/dom_parser_implementation.cpp index 5377f7eb..c5e6c291 100644 --- a/src/haswell/dom_parser_implementation.cpp +++ b/src/haswell/dom_parser_implementation.cpp @@ -70,6 +70,7 @@ WARN_UNUSED error_code implementation::minify(const uint8_t *buf, size_t len, ui return haswell::stage1::json_minifier::minify<128>(buf, len, dst, dst_len); } +#include "generic/stage1/find_next_document_index.h" #include "generic/stage1/utf8_lookup2_algorithm.h" #include "generic/stage1/json_structural_indexer.h" WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept { diff --git a/src/westmere/dom_parser_implementation.cpp b/src/westmere/dom_parser_implementation.cpp index 7c5a5c85..376fe0f7 100644 --- a/src/westmere/dom_parser_implementation.cpp +++ b/src/westmere/dom_parser_implementation.cpp @@ -71,6 +71,7 @@ WARN_UNUSED error_code implementation::minify(const uint8_t *buf, size_t len, ui return westmere::stage1::json_minifier::minify<64>(buf, len, dst, dst_len); } +#include "generic/stage1/find_next_document_index.h" #include "generic/stage1/utf8_lookup2_algorithm.h" #include "generic/stage1/json_structural_indexer.h" WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept {