Merge pull request #913 from simdjson/jkeiser/internal-streaming
[1/4] Simplify parse_many() and fix bugs
This commit is contained in:
commit
383e8c7f68
|
@ -43,7 +43,7 @@ public:
|
||||||
really_inline bool operator!=(const iterator &other) const noexcept;
|
really_inline bool operator!=(const iterator &other) const noexcept;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
iterator(document_stream& stream, bool finished) noexcept;
|
really_inline iterator(document_stream &s, bool finished) noexcept;
|
||||||
/** The document_stream we're iterating through. */
|
/** The document_stream we're iterating through. */
|
||||||
document_stream& stream;
|
document_stream& stream;
|
||||||
/** Whether we're finished or not. */
|
/** Whether we're finished or not. */
|
||||||
|
@ -66,7 +66,23 @@ private:
|
||||||
|
|
||||||
document_stream(document_stream &other) = delete; // Disallow copying
|
document_stream(document_stream &other) = delete; // Disallow copying
|
||||||
|
|
||||||
really_inline document_stream(dom::parser &parser, const uint8_t *buf, size_t len, size_t batch_size, error_code error = SUCCESS) noexcept;
|
/**
|
||||||
|
* Construct a document_stream. Does not allocate or parse anything until the iterator is
|
||||||
|
* used.
|
||||||
|
*/
|
||||||
|
really_inline document_stream(
|
||||||
|
dom::parser &parser,
|
||||||
|
const uint8_t *buf,
|
||||||
|
size_t len,
|
||||||
|
size_t batch_size,
|
||||||
|
error_code error = SUCCESS
|
||||||
|
) noexcept;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse the first document in the buffer. Used by begin(), to handle allocation and
|
||||||
|
* initialization.
|
||||||
|
*/
|
||||||
|
inline void start() noexcept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse the next document found in the buffer previously given to document_stream.
|
* Parse the next document found in the buffer previously given to document_stream.
|
||||||
|
@ -79,10 +95,7 @@ private:
|
||||||
* pre-allocating a capacity defined by the batch_size defined when creating the
|
* pre-allocating a capacity defined by the batch_size defined when creating the
|
||||||
* document_stream object.
|
* document_stream object.
|
||||||
*
|
*
|
||||||
* The function returns simdjson::SUCCESS_AND_HAS_MORE (an integer = 1) in case
|
* The function returns simdjson::EMPTY if there is no more data to be parsed.
|
||||||
* of success and indicates that the buffer still contains more data to be parsed,
|
|
||||||
* meaning this function can be called again to return the next JSON document
|
|
||||||
* after this one.
|
|
||||||
*
|
*
|
||||||
* The function returns simdjson::SUCCESS (as integer = 0) in case of success
|
* The function returns simdjson::SUCCESS (as integer = 0) in case of success
|
||||||
* and indicates that the buffer has successfully been parsed to the end.
|
* and indicates that the buffer has successfully been parsed to the end.
|
||||||
|
@ -93,55 +106,51 @@ private:
|
||||||
* the simdjson::error_message function converts these error codes into a string).
|
* the simdjson::error_message function converts these error codes into a string).
|
||||||
*
|
*
|
||||||
* You can also check validity by calling parser.is_valid(). The same parser can
|
* You can also check validity by calling parser.is_valid(). The same parser can
|
||||||
* and should be reused for the other documents in the buffer. */
|
* and should be reused for the other documents in the buffer.
|
||||||
inline error_code json_parse() noexcept;
|
*/
|
||||||
|
inline void next() noexcept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the location (index) of where the next document should be in the
|
* Pass the next batch through stage 1 and return when finished.
|
||||||
* buffer.
|
* When threads are enabled, this may wait for the stage 1 thread to finish.
|
||||||
* Can be used for debugging, it tells the user the position of the end of the
|
|
||||||
* last
|
|
||||||
* valid JSON document parsed
|
|
||||||
*/
|
*/
|
||||||
inline size_t get_current_buffer_loc() const { return current_buffer_loc; }
|
inline void load_batch() noexcept;
|
||||||
|
|
||||||
/**
|
/** Get the next document index. */
|
||||||
* Returns the total amount of complete documents parsed by the document_stream,
|
inline size_t next_batch_start() const noexcept;
|
||||||
* in the current buffer, at the given time.
|
|
||||||
*/
|
|
||||||
inline size_t get_n_parsed_docs() const { return n_parsed_docs; }
|
|
||||||
|
|
||||||
/**
|
/** Pass the next batch through stage 1 with the given parser. */
|
||||||
* Returns the total amount of data (in bytes) parsed by the document_stream,
|
inline error_code run_stage1(dom::parser &p, size_t batch_start) noexcept;
|
||||||
* in the current buffer, at the given time.
|
|
||||||
*/
|
|
||||||
inline size_t get_n_bytes_parsed() const { return n_bytes_parsed; }
|
|
||||||
|
|
||||||
inline const uint8_t *buf() const { return _buf + buf_start; }
|
|
||||||
|
|
||||||
inline void advance(size_t offset) { buf_start += offset; }
|
|
||||||
|
|
||||||
inline size_t remaining() const { return _len - buf_start; }
|
|
||||||
|
|
||||||
dom::parser &parser;
|
dom::parser &parser;
|
||||||
const uint8_t *_buf;
|
const uint8_t *buf;
|
||||||
const size_t _len;
|
const size_t len;
|
||||||
size_t _batch_size; // this is actually variable!
|
const size_t batch_size;
|
||||||
size_t buf_start{0};
|
size_t batch_start{0};
|
||||||
size_t next_json{0};
|
/** The error (or lack thereof) from the current document. */
|
||||||
bool load_next_batch{true};
|
error_code error;
|
||||||
size_t current_buffer_loc{0};
|
|
||||||
#ifdef SIMDJSON_THREADS_ENABLED
|
#ifdef SIMDJSON_THREADS_ENABLED
|
||||||
size_t last_json_buffer_loc{0};
|
inline void load_from_stage1_thread() noexcept;
|
||||||
#endif
|
|
||||||
size_t n_parsed_docs{0};
|
/** Start a thread to run stage 1 on the next batch. */
|
||||||
size_t n_bytes_parsed{0};
|
inline void start_stage1_thread() noexcept;
|
||||||
error_code error{SUCCESS_AND_HAS_MORE};
|
|
||||||
#ifdef SIMDJSON_THREADS_ENABLED
|
/** Wait for the stage 1 thread to finish and capture the results. */
|
||||||
error_code stage1_is_ok_thread{SUCCESS};
|
inline void finish_stage1_thread() noexcept;
|
||||||
std::thread stage_1_thread{};
|
|
||||||
dom::parser parser_thread{};
|
/** The error returned from the stage 1 thread. */
|
||||||
#endif
|
error_code stage1_thread_error{UNINITIALIZED};
|
||||||
|
/** The thread used to run stage 1 against the next batch in the background. */
|
||||||
|
std::thread stage1_thread{};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The parser used to run stage 1 in the background. Will be swapped
|
||||||
|
* with the regular parser when finished.
|
||||||
|
*/
|
||||||
|
dom::parser stage1_thread_parser{};
|
||||||
|
#endif // SIMDJSON_THREADS_ENABLED
|
||||||
|
|
||||||
friend class dom::parser;
|
friend class dom::parser;
|
||||||
}; // class document_stream
|
}; // class document_stream
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,6 @@ namespace simdjson {
|
||||||
*/
|
*/
|
||||||
enum error_code {
|
enum error_code {
|
||||||
SUCCESS = 0, ///< No error
|
SUCCESS = 0, ///< No error
|
||||||
SUCCESS_AND_HAS_MORE, ///< @private No error and buffer still has more data
|
|
||||||
CAPACITY, ///< This parser can't support a document that big
|
CAPACITY, ///< This parser can't support a document that big
|
||||||
MEMALLOC, ///< Error allocating memory, most likely out of memory
|
MEMALLOC, ///< Error allocating memory, most likely out of memory
|
||||||
TAPE_ERROR, ///< Something went wrong while writing to the tape (stage 2), this is a generic error
|
TAPE_ERROR, ///< Something went wrong while writing to the tape (stage 2), this is a generic error
|
||||||
|
|
|
@ -6,125 +6,37 @@
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
|
||||||
namespace simdjson {
|
|
||||||
namespace internal {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This algorithm is used to quickly identify the buffer position of
|
|
||||||
* the last JSON document inside the current batch.
|
|
||||||
*
|
|
||||||
* It does its work by finding the last pair of structural characters
|
|
||||||
* that represent the end followed by the start of a document.
|
|
||||||
*
|
|
||||||
* Simply put, we iterate over the structural characters, starting from
|
|
||||||
* the end. We consider that we found the end of a JSON document when the
|
|
||||||
* first element of the pair is NOT one of these characters: '{' '[' ';' ','
|
|
||||||
* and when the second element is NOT one of these characters: '}' '}' ';' ','.
|
|
||||||
*
|
|
||||||
* This simple comparison works most of the time, but it does not cover cases
|
|
||||||
* where the batch's structural indexes contain a perfect amount of documents.
|
|
||||||
* In such a case, we do not have access to the structural index which follows
|
|
||||||
* the last document, therefore, we do not have access to the second element in
|
|
||||||
* the pair, and means that we cannot identify the last document. To fix this
|
|
||||||
* issue, we keep a count of the open and closed curly/square braces we found
|
|
||||||
* while searching for the pair. When we find a pair AND the count of open and
|
|
||||||
* closed curly/square braces is the same, we know that we just passed a
|
|
||||||
* complete
|
|
||||||
* document, therefore the last json buffer location is the end of the batch
|
|
||||||
* */
|
|
||||||
inline uint32_t find_last_json_buf_idx(const uint8_t *buf, size_t size, const dom::parser &parser) {
|
|
||||||
// this function can be generally useful
|
|
||||||
if (parser.implementation->n_structural_indexes == 0)
|
|
||||||
return 0;
|
|
||||||
auto last_i = parser.implementation->n_structural_indexes - 1;
|
|
||||||
if (parser.implementation->structural_indexes[last_i] == size) {
|
|
||||||
if (last_i == 0)
|
|
||||||
return 0;
|
|
||||||
last_i = parser.implementation->n_structural_indexes - 2;
|
|
||||||
}
|
|
||||||
auto arr_cnt = 0;
|
|
||||||
auto obj_cnt = 0;
|
|
||||||
for (auto i = last_i; i > 0; i--) {
|
|
||||||
auto idxb = parser.implementation->structural_indexes[i];
|
|
||||||
switch (buf[idxb]) {
|
|
||||||
case ':':
|
|
||||||
case ',':
|
|
||||||
continue;
|
|
||||||
case '}':
|
|
||||||
obj_cnt--;
|
|
||||||
continue;
|
|
||||||
case ']':
|
|
||||||
arr_cnt--;
|
|
||||||
continue;
|
|
||||||
case '{':
|
|
||||||
obj_cnt++;
|
|
||||||
break;
|
|
||||||
case '[':
|
|
||||||
arr_cnt++;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
auto idxa = parser.implementation->structural_indexes[i - 1];
|
|
||||||
switch (buf[idxa]) {
|
|
||||||
case '{':
|
|
||||||
case '[':
|
|
||||||
case ':':
|
|
||||||
case ',':
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (!arr_cnt && !obj_cnt) {
|
|
||||||
return last_i + 1;
|
|
||||||
}
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns true if the provided byte value is an ASCII character
|
|
||||||
static inline bool is_ascii(char c) {
|
|
||||||
return ((unsigned char)c) <= 127;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the string ends with UTF-8 values, backtrack
|
|
||||||
// up to the first ASCII character. May return 0.
|
|
||||||
static inline size_t trimmed_length_safe_utf8(const char * c, size_t len) {
|
|
||||||
while ((len > 0) and (not is_ascii(c[len - 1]))) {
|
|
||||||
len--;
|
|
||||||
}
|
|
||||||
return len;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace internal
|
|
||||||
|
|
||||||
} // namespace simdjson
|
|
||||||
|
|
||||||
namespace simdjson {
|
namespace simdjson {
|
||||||
namespace dom {
|
namespace dom {
|
||||||
|
|
||||||
really_inline document_stream::document_stream(
|
really_inline document_stream::document_stream(
|
||||||
dom::parser &_parser,
|
dom::parser &_parser,
|
||||||
const uint8_t *buf,
|
const uint8_t *_buf,
|
||||||
size_t len,
|
size_t _len,
|
||||||
size_t batch_size,
|
size_t _batch_size,
|
||||||
error_code _error
|
error_code _error
|
||||||
) noexcept
|
) noexcept
|
||||||
: parser{_parser},
|
: parser{_parser},
|
||||||
_buf{buf},
|
buf{_buf},
|
||||||
_len{len},
|
len{_len},
|
||||||
_batch_size(batch_size),
|
batch_size{_batch_size},
|
||||||
error(_error)
|
error{_error}
|
||||||
{
|
{
|
||||||
if (!error) { error = json_parse(); }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inline document_stream::~document_stream() noexcept {
|
inline document_stream::~document_stream() noexcept {
|
||||||
#ifdef SIMDJSON_THREADS_ENABLED
|
#ifdef SIMDJSON_THREADS_ENABLED
|
||||||
if (stage_1_thread.joinable()) {
|
// TODO kill the thread, why should people have to wait for a non-side-effecting operation to complete
|
||||||
stage_1_thread.join();
|
if (stage1_thread.joinable()) {
|
||||||
|
stage1_thread.join();
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
really_inline document_stream::iterator document_stream::begin() noexcept {
|
really_inline document_stream::iterator document_stream::begin() noexcept {
|
||||||
return iterator(*this, false);
|
start();
|
||||||
|
// If there are no documents, we're finished.
|
||||||
|
return iterator(*this, error == EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
really_inline document_stream::iterator document_stream::end() noexcept {
|
really_inline document_stream::iterator document_stream::end() noexcept {
|
||||||
|
@ -136,17 +48,15 @@ really_inline document_stream::iterator::iterator(document_stream& _stream, bool
|
||||||
}
|
}
|
||||||
|
|
||||||
really_inline simdjson_result<element> document_stream::iterator::operator*() noexcept {
|
really_inline simdjson_result<element> document_stream::iterator::operator*() noexcept {
|
||||||
error_code err = stream.error == SUCCESS_AND_HAS_MORE ? SUCCESS : stream.error;
|
// Once we have yielded any errors, we're finished.
|
||||||
if (err) { return err; }
|
if (stream.error) { finished = true; return stream.error; }
|
||||||
return stream.parser.doc.root();
|
return stream.parser.doc.root();
|
||||||
}
|
}
|
||||||
|
|
||||||
really_inline document_stream::iterator& document_stream::iterator::operator++() noexcept {
|
really_inline document_stream::iterator& document_stream::iterator::operator++() noexcept {
|
||||||
if (stream.error == SUCCESS_AND_HAS_MORE) {
|
stream.next();
|
||||||
stream.error = stream.json_parse();
|
// If that was the last document, we're finished.
|
||||||
} else {
|
if (stream.error == EMPTY) { finished = true; }
|
||||||
finished = true;
|
|
||||||
}
|
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,130 +64,96 @@ really_inline bool document_stream::iterator::operator!=(const document_stream::
|
||||||
return finished != other.finished;
|
return finished != other.finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void document_stream::start() noexcept {
|
||||||
|
if (error) { return; }
|
||||||
|
|
||||||
|
error = parser.ensure_capacity(batch_size);
|
||||||
|
if (error) { return; }
|
||||||
|
|
||||||
|
// Always run the first stage 1 parse immediately
|
||||||
|
batch_start = 0;
|
||||||
|
error = run_stage1(parser, batch_start);
|
||||||
|
if (error) { return; }
|
||||||
|
|
||||||
|
#ifdef SIMDJSON_THREADS_ENABLED
|
||||||
|
if (next_batch_start() < len) {
|
||||||
|
// Kick off the first thread if needed
|
||||||
|
error = stage1_thread_parser.ensure_capacity(batch_size);
|
||||||
|
if (error) { return; }
|
||||||
|
start_stage1_thread();
|
||||||
|
if (error) { return; }
|
||||||
|
}
|
||||||
|
#endif // SIMDJSON_THREADS_ENABLED
|
||||||
|
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void document_stream::next() noexcept {
|
||||||
|
if (error) { return; }
|
||||||
|
|
||||||
|
// Load the next document from the batch
|
||||||
|
error = parser.implementation->stage2_next(parser.doc);
|
||||||
|
|
||||||
|
// If that was the last document in the batch, load another batch (if available)
|
||||||
|
while (error == EMPTY) {
|
||||||
|
batch_start = next_batch_start();
|
||||||
|
if (batch_start >= len) { break; }
|
||||||
|
|
||||||
|
#ifdef SIMDJSON_THREADS_ENABLED
|
||||||
|
load_from_stage1_thread();
|
||||||
|
#else
|
||||||
|
error = run_stage1(parser, batch_start);
|
||||||
|
#endif
|
||||||
|
if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
|
||||||
|
|
||||||
|
// Run stage 2 on the first document in the batch
|
||||||
|
error = parser.implementation->stage2_next(parser.doc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inline size_t document_stream::next_batch_start() const noexcept {
|
||||||
|
return batch_start + parser.implementation->structural_indexes[parser.implementation->n_structural_indexes];
|
||||||
|
}
|
||||||
|
|
||||||
|
inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept {
|
||||||
|
// If this is the final batch, pass partial = false
|
||||||
|
size_t remaining = len - _batch_start;
|
||||||
|
if (remaining <= batch_size) {
|
||||||
|
return p.implementation->stage1(&buf[_batch_start], remaining, false);
|
||||||
|
} else {
|
||||||
|
return p.implementation->stage1(&buf[_batch_start], batch_size, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef SIMDJSON_THREADS_ENABLED
|
#ifdef SIMDJSON_THREADS_ENABLED
|
||||||
|
|
||||||
// threaded version of json_parse
|
inline void document_stream::load_from_stage1_thread() noexcept {
|
||||||
// todo: simplify this code further
|
stage1_thread.join();
|
||||||
inline error_code document_stream::json_parse() noexcept {
|
|
||||||
error = parser.ensure_capacity(_batch_size);
|
|
||||||
if (error) { return error; }
|
|
||||||
error = parser_thread.ensure_capacity(_batch_size);
|
|
||||||
if (error) { return error; }
|
|
||||||
|
|
||||||
if (unlikely(load_next_batch)) {
|
// Swap to the parser that was loaded up in the thread. Make sure the parser has
|
||||||
// First time loading
|
// enough memory to swap to, as well.
|
||||||
if (!stage_1_thread.joinable()) {
|
std::swap(parser, stage1_thread_parser);
|
||||||
_batch_size = (std::min)(_batch_size, remaining());
|
error = stage1_thread_error;
|
||||||
_batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size);
|
if (error) { return; }
|
||||||
if (_batch_size == 0) {
|
|
||||||
return simdjson::UTF8_ERROR;
|
// If there's anything left, start the stage 1 thread!
|
||||||
}
|
if (next_batch_start() < len) {
|
||||||
auto stage1_is_ok = error_code(parser.implementation->stage1(buf(), _batch_size, true));
|
start_stage1_thread();
|
||||||
if (stage1_is_ok != simdjson::SUCCESS) {
|
|
||||||
return stage1_is_ok;
|
|
||||||
}
|
|
||||||
uint32_t last_index = internal::find_last_json_buf_idx(buf(), _batch_size, parser);
|
|
||||||
if (last_index == 0) {
|
|
||||||
if (parser.implementation->n_structural_indexes == 0) {
|
|
||||||
return simdjson::EMPTY;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
parser.implementation->n_structural_indexes = last_index + 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// the second thread is running or done.
|
|
||||||
else {
|
|
||||||
stage_1_thread.join();
|
|
||||||
if (stage1_is_ok_thread != simdjson::SUCCESS) {
|
|
||||||
return stage1_is_ok_thread;
|
|
||||||
}
|
|
||||||
std::swap(parser.implementation->structural_indexes, parser_thread.implementation->structural_indexes);
|
|
||||||
parser.implementation->n_structural_indexes = parser_thread.implementation->n_structural_indexes;
|
|
||||||
advance(last_json_buffer_loc);
|
|
||||||
n_bytes_parsed += last_json_buffer_loc;
|
|
||||||
}
|
|
||||||
// let us decide whether we will start a new thread
|
|
||||||
if (remaining() - _batch_size > 0) {
|
|
||||||
last_json_buffer_loc =
|
|
||||||
parser.implementation->structural_indexes[internal::find_last_json_buf_idx(buf(), _batch_size, parser)];
|
|
||||||
_batch_size = (std::min)(_batch_size, remaining() - last_json_buffer_loc);
|
|
||||||
if (_batch_size > 0) {
|
|
||||||
_batch_size = internal::trimmed_length_safe_utf8(
|
|
||||||
(const char *)(buf() + last_json_buffer_loc), _batch_size);
|
|
||||||
if (_batch_size == 0) {
|
|
||||||
return simdjson::UTF8_ERROR;
|
|
||||||
}
|
|
||||||
// let us capture read-only variables
|
|
||||||
const uint8_t *const b = buf() + last_json_buffer_loc;
|
|
||||||
const size_t bs = _batch_size;
|
|
||||||
// we call the thread on a lambda that will update
|
|
||||||
// this->stage1_is_ok_thread
|
|
||||||
// there is only one thread that may write to this value
|
|
||||||
stage_1_thread = std::thread([this, b, bs] {
|
|
||||||
this->stage1_is_ok_thread = error_code(parser_thread.implementation->stage1(b, bs, true));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
next_json = 0;
|
|
||||||
load_next_batch = false;
|
|
||||||
} // load_next_batch
|
|
||||||
error_code res = parser.implementation->stage2(buf(), remaining(), parser.doc, next_json);
|
|
||||||
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
|
|
||||||
n_parsed_docs++;
|
|
||||||
current_buffer_loc = parser.implementation->structural_indexes[next_json];
|
|
||||||
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
|
|
||||||
} else if (res == simdjson::SUCCESS) {
|
|
||||||
n_parsed_docs++;
|
|
||||||
if (remaining() > _batch_size) {
|
|
||||||
current_buffer_loc = parser.implementation->structural_indexes[next_json - 1];
|
|
||||||
load_next_batch = true;
|
|
||||||
res = simdjson::SUCCESS_AND_HAS_MORE;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#else // SIMDJSON_THREADS_ENABLED
|
inline void document_stream::start_stage1_thread() noexcept {
|
||||||
|
// we call the thread on a lambda that will update
|
||||||
// single-threaded version of json_parse
|
// this->stage1_thread_error
|
||||||
inline error_code document_stream::json_parse() noexcept {
|
// there is only one thread that may write to this value
|
||||||
error = parser.ensure_capacity(_batch_size);
|
// TODO this is NOT exception-safe.
|
||||||
if (error) { return error; }
|
this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
|
||||||
|
size_t _next_batch_start = this->next_batch_start();
|
||||||
if (unlikely(load_next_batch)) {
|
stage1_thread = std::thread([this, _next_batch_start] {
|
||||||
advance(current_buffer_loc);
|
this->stage1_thread_error = run_stage1(this->stage1_thread_parser, _next_batch_start);
|
||||||
n_bytes_parsed += current_buffer_loc;
|
});
|
||||||
_batch_size = (std::min)(_batch_size, remaining());
|
|
||||||
_batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size);
|
|
||||||
auto stage1_is_ok = (error_code)parser.implementation->stage1(buf(), _batch_size, true);
|
|
||||||
if (stage1_is_ok != simdjson::SUCCESS) {
|
|
||||||
return stage1_is_ok;
|
|
||||||
}
|
|
||||||
uint32_t last_index = internal::find_last_json_buf_idx(buf(), _batch_size, parser);
|
|
||||||
if (last_index == 0) {
|
|
||||||
if (parser.implementation->n_structural_indexes == 0) {
|
|
||||||
return EMPTY;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
parser.implementation->n_structural_indexes = last_index + 1;
|
|
||||||
}
|
|
||||||
load_next_batch = false;
|
|
||||||
} // load_next_batch
|
|
||||||
error_code res = parser.implementation->stage2(buf(), remaining(), parser.doc, next_json);
|
|
||||||
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
|
|
||||||
n_parsed_docs++;
|
|
||||||
current_buffer_loc = parser.implementation->structural_indexes[next_json];
|
|
||||||
} else if (res == simdjson::SUCCESS) {
|
|
||||||
n_parsed_docs++;
|
|
||||||
if (remaining() > _batch_size) {
|
|
||||||
current_buffer_loc = parser.implementation->structural_indexes[next_json - 1];
|
|
||||||
next_json = 1;
|
|
||||||
load_next_batch = true;
|
|
||||||
res = simdjson::SUCCESS_AND_HAS_MORE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif // SIMDJSON_THREADS_ENABLED
|
#endif // SIMDJSON_THREADS_ENABLED
|
||||||
|
|
||||||
} // namespace dom
|
} // namespace dom
|
||||||
|
|
|
@ -72,16 +72,13 @@ public:
|
||||||
*
|
*
|
||||||
* Stage 2 of the document parser for parser::parse_many.
|
* Stage 2 of the document parser for parser::parse_many.
|
||||||
*
|
*
|
||||||
* Guaranteed only to be called after stage1(), with buf and len being a subset of the total stage1 buf/len.
|
* Guaranteed only to be called after stage1().
|
||||||
* Overridden by each implementation.
|
* Overridden by each implementation.
|
||||||
*
|
*
|
||||||
* @param buf The json document to parse.
|
|
||||||
* @param len The length of the json document.
|
|
||||||
* @param doc The document to output to.
|
* @param doc The document to output to.
|
||||||
* @param next_json The next structural index. Start this at 0 the first time, and it will be updated to the next value to pass each time.
|
* @return The error code, SUCCESS if there was no error, or EMPTY if all documents have been parsed.
|
||||||
* @return The error code, SUCCESS if there was no error, or SUCCESS_AND_HAS_MORE if there was no error and stage2 can be called again.
|
|
||||||
*/
|
*/
|
||||||
WARN_UNUSED virtual error_code stage2(const uint8_t *buf, size_t len, dom::document &doc, size_t &next_json) noexcept = 0;
|
WARN_UNUSED virtual error_code stage2_next(dom::document &doc) noexcept = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Change the capacity of this parser.
|
* Change the capacity of this parser.
|
||||||
|
@ -117,6 +114,8 @@ public:
|
||||||
uint32_t n_structural_indexes{0};
|
uint32_t n_structural_indexes{0};
|
||||||
/** Structural indices passed from stage 1 to stage 2 */
|
/** Structural indices passed from stage 1 to stage 2 */
|
||||||
std::unique_ptr<uint32_t[]> structural_indexes{};
|
std::unique_ptr<uint32_t[]> structural_indexes{};
|
||||||
|
/** Next structural index to parse */
|
||||||
|
uint32_t next_structural_index{0};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The largest document this parser can support without reallocating.
|
* The largest document this parser can support without reallocating.
|
||||||
|
|
|
@ -82,6 +82,7 @@ WARN_UNUSED error_code implementation::minify(const uint8_t *buf, size_t len, ui
|
||||||
return arm64::stage1::json_minifier::minify<64>(buf, len, dst, dst_len);
|
return arm64::stage1::json_minifier::minify<64>(buf, len, dst, dst_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#include "generic/stage1/find_next_document_index.h"
|
||||||
#include "generic/stage1/utf8_lookup2_algorithm.h"
|
#include "generic/stage1/utf8_lookup2_algorithm.h"
|
||||||
#include "generic/stage1/json_structural_indexer.h"
|
#include "generic/stage1/json_structural_indexer.h"
|
||||||
WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept {
|
WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept {
|
||||||
|
|
|
@ -5,7 +5,6 @@ namespace internal {
|
||||||
|
|
||||||
SIMDJSON_DLLIMPORTEXPORT const error_code_info error_codes[] {
|
SIMDJSON_DLLIMPORTEXPORT const error_code_info error_codes[] {
|
||||||
{ SUCCESS, "No error" },
|
{ SUCCESS, "No error" },
|
||||||
{ SUCCESS_AND_HAS_MORE, "No error and buffer still has more data" },
|
|
||||||
{ CAPACITY, "This parser can't support a document that big" },
|
{ CAPACITY, "This parser can't support a document that big" },
|
||||||
{ MEMALLOC, "Error allocating memory, we're most likely out of memory" },
|
{ MEMALLOC, "Error allocating memory, we're most likely out of memory" },
|
||||||
{ TAPE_ERROR, "The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc." },
|
{ TAPE_ERROR, "The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc." },
|
||||||
|
|
|
@ -9,15 +9,17 @@ namespace simdjson {
|
||||||
namespace fallback {
|
namespace fallback {
|
||||||
namespace stage1 {
|
namespace stage1 {
|
||||||
|
|
||||||
|
#include "generic/stage1/find_next_document_index.h"
|
||||||
|
|
||||||
class structural_scanner {
|
class structural_scanner {
|
||||||
public:
|
public:
|
||||||
|
|
||||||
really_inline structural_scanner(dom_parser_implementation &_parser, bool _streaming)
|
really_inline structural_scanner(dom_parser_implementation &_parser, bool _partial)
|
||||||
: buf{_parser.buf},
|
: buf{_parser.buf},
|
||||||
next_structural_index{_parser.structural_indexes.get()},
|
next_structural_index{_parser.structural_indexes.get()},
|
||||||
parser{_parser},
|
parser{_parser},
|
||||||
len{static_cast<uint32_t>(_parser.len)},
|
len{static_cast<uint32_t>(_parser.len)},
|
||||||
streaming{_streaming} {
|
partial{_partial} {
|
||||||
}
|
}
|
||||||
|
|
||||||
really_inline void add_structural() {
|
really_inline void add_structural() {
|
||||||
|
@ -41,7 +43,12 @@ really_inline void validate_utf8_character() {
|
||||||
// 2-byte
|
// 2-byte
|
||||||
if ((buf[idx] & 0b00100000) == 0) {
|
if ((buf[idx] & 0b00100000) == 0) {
|
||||||
// missing continuation
|
// missing continuation
|
||||||
if (unlikely(idx+1 > len || !is_continuation(buf[idx+1]))) { error = UTF8_ERROR; idx++; return; }
|
if (unlikely(idx+1 > len || !is_continuation(buf[idx+1]))) {
|
||||||
|
if (idx+1 > len && partial) { idx = len; return; }
|
||||||
|
error = UTF8_ERROR;
|
||||||
|
idx++;
|
||||||
|
return;
|
||||||
|
}
|
||||||
// overlong: 1100000_ 10______
|
// overlong: 1100000_ 10______
|
||||||
if (buf[idx] <= 0b11000001) { error = UTF8_ERROR; }
|
if (buf[idx] <= 0b11000001) { error = UTF8_ERROR; }
|
||||||
idx += 2;
|
idx += 2;
|
||||||
|
@ -51,7 +58,12 @@ really_inline void validate_utf8_character() {
|
||||||
// 3-byte
|
// 3-byte
|
||||||
if ((buf[idx] & 0b00010000) == 0) {
|
if ((buf[idx] & 0b00010000) == 0) {
|
||||||
// missing continuation
|
// missing continuation
|
||||||
if (unlikely(idx+2 > len || !is_continuation(buf[idx+1]) || !is_continuation(buf[idx+2]))) { error = UTF8_ERROR; idx++; return; }
|
if (unlikely(idx+2 > len || !is_continuation(buf[idx+1]) || !is_continuation(buf[idx+2]))) {
|
||||||
|
if (idx+2 > len && partial) { idx = len; return; }
|
||||||
|
error = UTF8_ERROR;
|
||||||
|
idx++;
|
||||||
|
return;
|
||||||
|
}
|
||||||
// overlong: 11100000 100_____ ________
|
// overlong: 11100000 100_____ ________
|
||||||
if (buf[idx] == 0b11100000 && buf[idx+1] <= 0b10011111) { error = UTF8_ERROR; }
|
if (buf[idx] == 0b11100000 && buf[idx+1] <= 0b10011111) { error = UTF8_ERROR; }
|
||||||
// surrogates: U+D800-U+DFFF 11101101 101_____
|
// surrogates: U+D800-U+DFFF 11101101 101_____
|
||||||
|
@ -62,7 +74,12 @@ really_inline void validate_utf8_character() {
|
||||||
|
|
||||||
// 4-byte
|
// 4-byte
|
||||||
// missing continuation
|
// missing continuation
|
||||||
if (unlikely(idx+3 > len || !is_continuation(buf[idx+1]) || !is_continuation(buf[idx+2]) || !is_continuation(buf[idx+3]))) { error = UTF8_ERROR; idx++; return; }
|
if (unlikely(idx+3 > len || !is_continuation(buf[idx+1]) || !is_continuation(buf[idx+2]) || !is_continuation(buf[idx+3]))) {
|
||||||
|
if (idx+2 > len && partial) { idx = len; return; }
|
||||||
|
error = UTF8_ERROR;
|
||||||
|
idx++;
|
||||||
|
return;
|
||||||
|
}
|
||||||
// overlong: 11110000 1000____ ________ ________
|
// overlong: 11110000 1000____ ________ ________
|
||||||
if (buf[idx] == 0b11110000 && buf[idx+1] <= 0b10001111) { error = UTF8_ERROR; }
|
if (buf[idx] == 0b11110000 && buf[idx+1] <= 0b10001111) { error = UTF8_ERROR; }
|
||||||
// too large: > U+10FFFF:
|
// too large: > U+10FFFF:
|
||||||
|
@ -87,7 +104,7 @@ really_inline void validate_string() {
|
||||||
idx++;
|
idx++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (idx >= len && !streaming) { error = UNCLOSED_STRING; }
|
if (idx >= len && !partial) { error = UNCLOSED_STRING; }
|
||||||
}
|
}
|
||||||
|
|
||||||
really_inline bool is_whitespace_or_operator(uint8_t c) {
|
really_inline bool is_whitespace_or_operator(uint8_t c) {
|
||||||
|
@ -128,16 +145,26 @@ really_inline error_code scan() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (unlikely(next_structural_index == parser.structural_indexes.get())) {
|
|
||||||
return EMPTY;
|
|
||||||
}
|
|
||||||
*next_structural_index = len;
|
*next_structural_index = len;
|
||||||
next_structural_index++;
|
|
||||||
// We pad beyond.
|
// We pad beyond.
|
||||||
// https://github.com/simdjson/simdjson/issues/906
|
// https://github.com/simdjson/simdjson/issues/906
|
||||||
next_structural_index[0] = len;
|
next_structural_index[1] = len;
|
||||||
next_structural_index[1] = 0;
|
next_structural_index[2] = 0;
|
||||||
parser.n_structural_indexes = uint32_t(next_structural_index - parser.structural_indexes.get());
|
parser.n_structural_indexes = uint32_t(next_structural_index - parser.structural_indexes.get());
|
||||||
|
parser.next_structural_index = 0;
|
||||||
|
|
||||||
|
if (unlikely(parser.n_structural_indexes == 0)) {
|
||||||
|
return EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (partial) {
|
||||||
|
auto new_structural_indexes = find_next_document_index(parser);
|
||||||
|
if (new_structural_indexes == 0 && parser.n_structural_indexes > 0) {
|
||||||
|
return CAPACITY; // If the buffer is partial but the document is incomplete, it's too big to parse.
|
||||||
|
}
|
||||||
|
parser.n_structural_indexes = new_structural_indexes;
|
||||||
|
}
|
||||||
|
|
||||||
return error;
|
return error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,16 +175,16 @@ private:
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
uint32_t idx{0};
|
uint32_t idx{0};
|
||||||
error_code error{SUCCESS};
|
error_code error{SUCCESS};
|
||||||
bool streaming;
|
bool partial;
|
||||||
}; // structural_scanner
|
}; // structural_scanner
|
||||||
|
|
||||||
} // namespace stage1
|
} // namespace stage1
|
||||||
|
|
||||||
|
|
||||||
WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept {
|
WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool partial) noexcept {
|
||||||
this->buf = _buf;
|
this->buf = _buf;
|
||||||
this->len = _len;
|
this->len = _len;
|
||||||
stage1::structural_scanner scanner(*this, streaming);
|
stage1::structural_scanner scanner(*this, partial);
|
||||||
return scanner.scan();
|
return scanner.scan();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,11 +28,12 @@ public:
|
||||||
really_inline dom_parser_implementation();
|
really_inline dom_parser_implementation();
|
||||||
dom_parser_implementation(const dom_parser_implementation &) = delete;
|
dom_parser_implementation(const dom_parser_implementation &) = delete;
|
||||||
dom_parser_implementation & operator=(const dom_parser_implementation &) = delete;
|
dom_parser_implementation & operator=(const dom_parser_implementation &) = delete;
|
||||||
|
|
||||||
WARN_UNUSED error_code parse(const uint8_t *buf, size_t len, dom::document &doc) noexcept final;
|
WARN_UNUSED error_code parse(const uint8_t *buf, size_t len, dom::document &doc) noexcept final;
|
||||||
WARN_UNUSED error_code stage1(const uint8_t *buf, size_t len, bool streaming) noexcept final;
|
WARN_UNUSED error_code stage1(const uint8_t *buf, size_t len, bool partial) noexcept final;
|
||||||
|
WARN_UNUSED error_code check_for_unclosed_array() noexcept;
|
||||||
WARN_UNUSED error_code stage2(dom::document &doc) noexcept final;
|
WARN_UNUSED error_code stage2(dom::document &doc) noexcept final;
|
||||||
WARN_UNUSED error_code stage2(const uint8_t *buf, size_t len, dom::document &doc, size_t &next_json) noexcept final;
|
WARN_UNUSED error_code stage2_next(dom::document &doc) noexcept final;
|
||||||
WARN_UNUSED error_code set_capacity(size_t capacity) noexcept final;
|
WARN_UNUSED error_code set_capacity(size_t capacity) noexcept final;
|
||||||
WARN_UNUSED error_code set_max_depth(size_t max_depth) noexcept final;
|
WARN_UNUSED error_code set_max_depth(size_t max_depth) noexcept final;
|
||||||
};
|
};
|
||||||
|
|
|
@ -8,6 +8,8 @@ really_inline error_code set_capacity(internal::dom_parser_implementation &parse
|
||||||
size_t max_structures = ROUNDUP_N(capacity, 64) + 2 + 7;
|
size_t max_structures = ROUNDUP_N(capacity, 64) + 2 + 7;
|
||||||
parser.structural_indexes.reset( new (std::nothrow) uint32_t[max_structures] );
|
parser.structural_indexes.reset( new (std::nothrow) uint32_t[max_structures] );
|
||||||
if (!parser.structural_indexes) { return MEMALLOC; }
|
if (!parser.structural_indexes) { return MEMALLOC; }
|
||||||
|
parser.structural_indexes[0] = 0;
|
||||||
|
parser.n_structural_indexes = 0;
|
||||||
return SUCCESS;
|
return SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,24 +2,21 @@
|
||||||
template<size_t STEP_SIZE>
|
template<size_t STEP_SIZE>
|
||||||
struct buf_block_reader {
|
struct buf_block_reader {
|
||||||
public:
|
public:
|
||||||
really_inline buf_block_reader(const uint8_t *_buf, size_t _len) : buf{_buf}, len{_len}, lenminusstep{len < STEP_SIZE ? 0 : len - STEP_SIZE}, idx{0} {}
|
really_inline buf_block_reader(const uint8_t *_buf, size_t _len);
|
||||||
really_inline size_t block_index() { return idx; }
|
really_inline size_t block_index();
|
||||||
really_inline bool has_full_block() const {
|
really_inline bool has_full_block() const;
|
||||||
return idx < lenminusstep;
|
really_inline const uint8_t *full_block() const;
|
||||||
}
|
/**
|
||||||
really_inline const uint8_t *full_block() const {
|
* Get the last block, padded with spaces.
|
||||||
return &buf[idx];
|
*
|
||||||
}
|
* There will always be a last block, with at least 1 byte, unless len == 0 (in which case this
|
||||||
really_inline bool has_remainder() const {
|
* function fills the buffer with spaces and returns 0. In particular, if len == STEP_SIZE there
|
||||||
return idx < len;
|
* will be 0 full_blocks and 1 remainder block with STEP_SIZE bytes and no spaces for padding.
|
||||||
}
|
*
|
||||||
really_inline void get_remainder(uint8_t *tmp_buf) const {
|
* @return the number of effective characters in the last block.
|
||||||
memset(tmp_buf, 0x20, STEP_SIZE);
|
*/
|
||||||
memcpy(tmp_buf, buf + idx, len - idx);
|
really_inline size_t get_remainder(uint8_t *dst) const;
|
||||||
}
|
really_inline void advance();
|
||||||
really_inline void advance() {
|
|
||||||
idx += STEP_SIZE;
|
|
||||||
}
|
|
||||||
private:
|
private:
|
||||||
const uint8_t *buf;
|
const uint8_t *buf;
|
||||||
const size_t len;
|
const size_t len;
|
||||||
|
@ -27,6 +24,18 @@ private:
|
||||||
size_t idx;
|
size_t idx;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
constexpr const int TITLE_SIZE = 12;
|
||||||
|
|
||||||
|
// Routines to print masks and text for debugging bitmask operations
|
||||||
|
UNUSED static char * format_input_text_64(const uint8_t *text) {
|
||||||
|
static char *buf = (char*)malloc(sizeof(simd8x64<uint8_t>) + 1);
|
||||||
|
for (size_t i=0; i<sizeof(simd8x64<uint8_t>); i++) {
|
||||||
|
buf[i] = int8_t(text[i]) < ' ' ? '_' : int8_t(text[i]);
|
||||||
|
}
|
||||||
|
buf[sizeof(simd8x64<uint8_t>)] = '\0';
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
// Routines to print masks and text for debugging bitmask operations
|
// Routines to print masks and text for debugging bitmask operations
|
||||||
UNUSED static char * format_input_text(const simd8x64<uint8_t> in) {
|
UNUSED static char * format_input_text(const simd8x64<uint8_t> in) {
|
||||||
static char *buf = (char*)malloc(sizeof(simd8x64<uint8_t>) + 1);
|
static char *buf = (char*)malloc(sizeof(simd8x64<uint8_t>) + 1);
|
||||||
|
@ -46,3 +55,31 @@ UNUSED static char * format_mask(uint64_t mask) {
|
||||||
buf[64] = '\0';
|
buf[64] = '\0';
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<size_t STEP_SIZE>
|
||||||
|
really_inline buf_block_reader<STEP_SIZE>::buf_block_reader(const uint8_t *_buf, size_t _len) : buf{_buf}, len{_len}, lenminusstep{len < STEP_SIZE ? 0 : len - STEP_SIZE}, idx{0} {}
|
||||||
|
|
||||||
|
template<size_t STEP_SIZE>
|
||||||
|
really_inline size_t buf_block_reader<STEP_SIZE>::block_index() { return idx; }
|
||||||
|
|
||||||
|
template<size_t STEP_SIZE>
|
||||||
|
really_inline bool buf_block_reader<STEP_SIZE>::has_full_block() const {
|
||||||
|
return idx < lenminusstep;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<size_t STEP_SIZE>
|
||||||
|
really_inline const uint8_t *buf_block_reader<STEP_SIZE>::full_block() const {
|
||||||
|
return &buf[idx];
|
||||||
|
}
|
||||||
|
|
||||||
|
template<size_t STEP_SIZE>
|
||||||
|
really_inline size_t buf_block_reader<STEP_SIZE>::get_remainder(uint8_t *dst) const {
|
||||||
|
memset(dst, 0x20, STEP_SIZE); // memset STEP_SIZE because it's more efficient to write out 8 or 16 bytes at once.
|
||||||
|
memcpy(dst, buf + idx, len - idx);
|
||||||
|
return len - idx;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<size_t STEP_SIZE>
|
||||||
|
really_inline void buf_block_reader<STEP_SIZE>::advance() {
|
||||||
|
idx += STEP_SIZE;
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/**
|
||||||
|
* This algorithm is used to quickly identify the last structural position that
|
||||||
|
* makes up a complete document.
|
||||||
|
*
|
||||||
|
* It does this by going backwards and finding the last *document boundary* (a
|
||||||
|
* place where one value follows another without a comma between them). If the
|
||||||
|
* last document (the characters after the boundary) has an equal number of
|
||||||
|
* start and end brackets, it is considered complete.
|
||||||
|
*
|
||||||
|
* Simply put, we iterate over the structural characters, starting from
|
||||||
|
* the end. We consider that we found the end of a JSON document when the
|
||||||
|
* first element of the pair is NOT one of these characters: '{' '[' ';' ','
|
||||||
|
* and when the second element is NOT one of these characters: '}' '}' ';' ','.
|
||||||
|
*
|
||||||
|
* This simple comparison works most of the time, but it does not cover cases
|
||||||
|
* where the batch's structural indexes contain a perfect amount of documents.
|
||||||
|
* In such a case, we do not have access to the structural index which follows
|
||||||
|
* the last document, therefore, we do not have access to the second element in
|
||||||
|
* the pair, and that means we cannot identify the last document. To fix this
|
||||||
|
* issue, we keep a count of the open and closed curly/square braces we found
|
||||||
|
* while searching for the pair. When we find a pair AND the count of open and
|
||||||
|
* closed curly/square braces is the same, we know that we just passed a
|
||||||
|
* complete document, therefore the last json buffer location is the end of the
|
||||||
|
* batch.
|
||||||
|
*/
|
||||||
|
really_inline static uint32_t find_next_document_index(dom_parser_implementation &parser) {
|
||||||
|
// TODO don't count separately, just figure out depth
|
||||||
|
auto arr_cnt = 0;
|
||||||
|
auto obj_cnt = 0;
|
||||||
|
for (auto i = parser.n_structural_indexes - 1; i > 0; i--) {
|
||||||
|
auto idxb = parser.structural_indexes[i];
|
||||||
|
switch (parser.buf[idxb]) {
|
||||||
|
case ':':
|
||||||
|
case ',':
|
||||||
|
continue;
|
||||||
|
case '}':
|
||||||
|
obj_cnt--;
|
||||||
|
continue;
|
||||||
|
case ']':
|
||||||
|
arr_cnt--;
|
||||||
|
continue;
|
||||||
|
case '{':
|
||||||
|
obj_cnt++;
|
||||||
|
break;
|
||||||
|
case '[':
|
||||||
|
arr_cnt++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
auto idxa = parser.structural_indexes[i - 1];
|
||||||
|
switch (parser.buf[idxa]) {
|
||||||
|
case '{':
|
||||||
|
case '[':
|
||||||
|
case ':':
|
||||||
|
case ',':
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Last document is complete, so the next document will appear after!
|
||||||
|
if (!arr_cnt && !obj_cnt) {
|
||||||
|
return parser.n_structural_indexes;
|
||||||
|
}
|
||||||
|
// Last document is incomplete; mark the document at i + 1 as the next one
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip the last character if it is partial
|
||||||
|
really_inline static size_t trim_partial_utf8(const uint8_t *buf, size_t len) {
|
||||||
|
if (unlikely(len < 3)) {
|
||||||
|
switch (len) {
|
||||||
|
case 2:
|
||||||
|
if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left
|
||||||
|
if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 2 bytes left
|
||||||
|
return len;
|
||||||
|
case 1:
|
||||||
|
if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left
|
||||||
|
return len;
|
||||||
|
case 0:
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left
|
||||||
|
if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 1 byte left
|
||||||
|
if (buf[len-3] >= 0b11110000) { return len-3; } // 4-byte characters with only 3 bytes left
|
||||||
|
return len;
|
||||||
|
}
|
|
@ -59,13 +59,15 @@ template<size_t STEP_SIZE>
|
||||||
error_code json_minifier::minify(const uint8_t *buf, size_t len, uint8_t *dst, size_t &dst_len) noexcept {
|
error_code json_minifier::minify(const uint8_t *buf, size_t len, uint8_t *dst, size_t &dst_len) noexcept {
|
||||||
buf_block_reader<STEP_SIZE> reader(buf, len);
|
buf_block_reader<STEP_SIZE> reader(buf, len);
|
||||||
json_minifier minifier(dst);
|
json_minifier minifier(dst);
|
||||||
|
|
||||||
|
// Index the first n-1 blocks
|
||||||
while (reader.has_full_block()) {
|
while (reader.has_full_block()) {
|
||||||
minifier.step<STEP_SIZE>(reader.full_block(), reader);
|
minifier.step<STEP_SIZE>(reader.full_block(), reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (likely(reader.has_remainder())) {
|
// Index the last (remainder) block, padded with spaces
|
||||||
uint8_t block[STEP_SIZE];
|
uint8_t block[STEP_SIZE];
|
||||||
reader.get_remainder(block);
|
if (likely(reader.get_remainder(block)) > 0) {
|
||||||
minifier.step<STEP_SIZE>(block, reader);
|
minifier.step<STEP_SIZE>(block, reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,16 +57,22 @@ public:
|
||||||
|
|
||||||
class json_structural_indexer {
|
class json_structural_indexer {
|
||||||
public:
|
public:
|
||||||
|
/**
|
||||||
|
* Find the important bits of JSON in a 128-byte chunk, and add them to structural_indexes.
|
||||||
|
*
|
||||||
|
* @param partial Setting the partial parameter to true allows the find_structural_bits to
|
||||||
|
* tolerate unclosed strings. The caller should still ensure that the input is valid UTF-8. If
|
||||||
|
* you are processing substrings, you may want to call on a function like trimmed_length_safe_utf8.
|
||||||
|
*/
|
||||||
template<size_t STEP_SIZE>
|
template<size_t STEP_SIZE>
|
||||||
static error_code index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept;
|
static error_code index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool partial) noexcept;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
really_inline json_structural_indexer(uint32_t *structural_indexes)
|
really_inline json_structural_indexer(uint32_t *structural_indexes);
|
||||||
: indexer{structural_indexes} {}
|
|
||||||
template<size_t STEP_SIZE>
|
template<size_t STEP_SIZE>
|
||||||
really_inline void step(const uint8_t *block, buf_block_reader<STEP_SIZE> &reader) noexcept;
|
really_inline void step(const uint8_t *block, buf_block_reader<STEP_SIZE> &reader) noexcept;
|
||||||
really_inline void next(simd::simd8x64<uint8_t> in, json_block block, size_t idx);
|
really_inline void next(simd::simd8x64<uint8_t> in, json_block block, size_t idx);
|
||||||
really_inline error_code finish(dom_parser_implementation &parser, size_t idx, size_t len, bool streaming);
|
really_inline error_code finish(dom_parser_implementation &parser, size_t idx, size_t len, bool partial);
|
||||||
|
|
||||||
json_scanner scanner{};
|
json_scanner scanner{};
|
||||||
utf8_checker checker{};
|
utf8_checker checker{};
|
||||||
|
@ -75,57 +81,44 @@ private:
|
||||||
uint64_t unescaped_chars_error = 0;
|
uint64_t unescaped_chars_error = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
really_inline void json_structural_indexer::next(simd::simd8x64<uint8_t> in, json_block block, size_t idx) {
|
really_inline json_structural_indexer::json_structural_indexer(uint32_t *structural_indexes) : indexer{structural_indexes} {}
|
||||||
uint64_t unescaped = in.lteq(0x1F);
|
|
||||||
checker.check_next_input(in);
|
|
||||||
indexer.write(uint32_t(idx-64), prev_structurals); // Output *last* iteration's structurals to the parser
|
|
||||||
prev_structurals = block.structural_start();
|
|
||||||
unescaped_chars_error |= block.non_quote_inside_string(unescaped);
|
|
||||||
}
|
|
||||||
|
|
||||||
really_inline error_code json_structural_indexer::finish(dom_parser_implementation &parser, size_t idx, size_t len, bool streaming) {
|
//
|
||||||
// Write out the final iteration's structurals
|
// PERF NOTES:
|
||||||
indexer.write(uint32_t(idx-64), prev_structurals);
|
// We pipe 2 inputs through these stages:
|
||||||
|
// 1. Load JSON into registers. This takes a long time and is highly parallelizable, so we load
|
||||||
|
// 2 inputs' worth at once so that by the time step 2 is looking for them input, it's available.
|
||||||
|
// 2. Scan the JSON for critical data: strings, scalars and operators. This is the critical path.
|
||||||
|
// The output of step 1 depends entirely on this information. These functions don't quite use
|
||||||
|
// up enough CPU: the second half of the functions is highly serial, only using 1 execution core
|
||||||
|
// at a time. The second input's scans has some dependency on the first ones finishing it, but
|
||||||
|
// they can make a lot of progress before they need that information.
|
||||||
|
// 3. Step 1 doesn't use enough capacity, so we run some extra stuff while we're waiting for that
|
||||||
|
// to finish: utf-8 checks and generating the output from the last iteration.
|
||||||
|
//
|
||||||
|
// The reason we run 2 inputs at a time, is steps 2 and 3 are *still* not enough to soak up all
|
||||||
|
// available capacity with just one input. Running 2 at a time seems to give the CPU a good enough
|
||||||
|
// workout.
|
||||||
|
//
|
||||||
|
template<size_t STEP_SIZE>
|
||||||
|
error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool partial) noexcept {
|
||||||
|
if (unlikely(len > parser.capacity())) { return CAPACITY; }
|
||||||
|
if (partial) { len = trim_partial_utf8(buf, len); }
|
||||||
|
|
||||||
error_code error = scanner.finish(streaming);
|
buf_block_reader<STEP_SIZE> reader(buf, len);
|
||||||
if (unlikely(error != SUCCESS)) { return error; }
|
json_structural_indexer indexer(parser.structural_indexes.get());
|
||||||
|
|
||||||
if (unescaped_chars_error) {
|
// Read all but the last block
|
||||||
return UNESCAPED_CHARS;
|
while (reader.has_full_block()) {
|
||||||
|
indexer.step<STEP_SIZE>(reader.full_block(), reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
parser.n_structural_indexes = uint32_t(indexer.tail - parser.structural_indexes.get());
|
// Take care of the last block (will always be there unless file is empty)
|
||||||
/* a valid JSON file cannot have zero structural indexes - we should have
|
uint8_t block[STEP_SIZE];
|
||||||
* found something */
|
if (unlikely(reader.get_remainder(block) == 0)) { return EMPTY; }
|
||||||
if (unlikely(parser.n_structural_indexes == 0u)) {
|
indexer.step<STEP_SIZE>(block, reader);
|
||||||
return EMPTY;
|
|
||||||
}
|
return indexer.finish(parser, reader.block_index(), len, partial);
|
||||||
if (unlikely(parser.structural_indexes[parser.n_structural_indexes - 1] > len)) {
|
|
||||||
return UNEXPECTED_ERROR;
|
|
||||||
}
|
|
||||||
if (len != parser.structural_indexes[parser.n_structural_indexes - 1]) {
|
|
||||||
/* the string might not be NULL terminated, but we add a virtual NULL
|
|
||||||
* ending character. */
|
|
||||||
parser.structural_indexes[parser.n_structural_indexes++] = uint32_t(len);
|
|
||||||
}
|
|
||||||
/* make it safe to dereference one beyond this array */
|
|
||||||
parser.structural_indexes[parser.n_structural_indexes] = uint32_t(len);
|
|
||||||
parser.structural_indexes[parser.n_structural_indexes + 1] = 0;
|
|
||||||
/***
|
|
||||||
* This is related to https://github.com/simdjson/simdjson/issues/906
|
|
||||||
* Basically, we want to make sure that if the parsing continues beyond the last (valid)
|
|
||||||
* structural character, it quickly stops.
|
|
||||||
* Only three structural characters can be repeated without triggering an error in JSON: [,] and }.
|
|
||||||
* We repeat the padding character (at 'len'). We don't know what it is, but if the parsing
|
|
||||||
* continues, then it must be [,] or }.
|
|
||||||
* Suppose it is ] or }. We backtrack to the first character, what could it be that would
|
|
||||||
* not trigger an error? It could be ] or } but no, because you can't start a document that way.
|
|
||||||
* It can't be a comma, a colon or any simple value. So the only way we could continue is
|
|
||||||
* if the repeated character is [. But if so, the document must start with [. But if the document
|
|
||||||
* starts with [, it should end with ]. If we enforce that rule, then we would get
|
|
||||||
* ][[ which is invalid.
|
|
||||||
**/
|
|
||||||
return checker.errors();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<>
|
template<>
|
||||||
|
@ -147,45 +140,59 @@ really_inline void json_structural_indexer::step<64>(const uint8_t *block, buf_b
|
||||||
reader.advance();
|
reader.advance();
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
really_inline void json_structural_indexer::next(simd::simd8x64<uint8_t> in, json_block block, size_t idx) {
|
||||||
// Find the important bits of JSON in a 128-byte chunk, and add them to structural_indexes.
|
uint64_t unescaped = in.lteq(0x1F);
|
||||||
//
|
checker.check_next_input(in);
|
||||||
// PERF NOTES:
|
indexer.write(uint32_t(idx-64), prev_structurals); // Output *last* iteration's structurals to the parser
|
||||||
// We pipe 2 inputs through these stages:
|
prev_structurals = block.structural_start();
|
||||||
// 1. Load JSON into registers. This takes a long time and is highly parallelizable, so we load
|
unescaped_chars_error |= block.non_quote_inside_string(unescaped);
|
||||||
// 2 inputs' worth at once so that by the time step 2 is looking for them input, it's available.
|
}
|
||||||
// 2. Scan the JSON for critical data: strings, scalars and operators. This is the critical path.
|
|
||||||
// The output of step 1 depends entirely on this information. These functions don't quite use
|
|
||||||
// up enough CPU: the second half of the functions is highly serial, only using 1 execution core
|
|
||||||
// at a time. The second input's scans has some dependency on the first ones finishing it, but
|
|
||||||
// they can make a lot of progress before they need that information.
|
|
||||||
// 3. Step 1 doesn't use enough capacity, so we run some extra stuff while we're waiting for that
|
|
||||||
// to finish: utf-8 checks and generating the output from the last iteration.
|
|
||||||
//
|
|
||||||
// The reason we run 2 inputs at a time, is steps 2 and 3 are *still* not enough to soak up all
|
|
||||||
// available capacity with just one input. Running 2 at a time seems to give the CPU a good enough
|
|
||||||
// workout.
|
|
||||||
//
|
|
||||||
// Setting the streaming parameter to true allows the find_structural_bits to tolerate unclosed strings.
|
|
||||||
// The caller should still ensure that the input is valid UTF-8. If you are processing substrings,
|
|
||||||
// you may want to call on a function like trimmed_length_safe_utf8.
|
|
||||||
template<size_t STEP_SIZE>
|
|
||||||
error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept {
|
|
||||||
if (unlikely(len > parser.capacity())) { return CAPACITY; }
|
|
||||||
|
|
||||||
buf_block_reader<STEP_SIZE> reader(buf, len);
|
really_inline error_code json_structural_indexer::finish(dom_parser_implementation &parser, size_t idx, size_t len, bool partial) {
|
||||||
json_structural_indexer indexer(parser.structural_indexes.get());
|
// Write out the final iteration's structurals
|
||||||
while (reader.has_full_block()) {
|
indexer.write(uint32_t(idx-64), prev_structurals);
|
||||||
indexer.step<STEP_SIZE>(reader.full_block(), reader);
|
|
||||||
|
error_code error = scanner.finish(partial);
|
||||||
|
if (unlikely(error != SUCCESS)) { return error; }
|
||||||
|
|
||||||
|
if (unescaped_chars_error) {
|
||||||
|
return UNESCAPED_CHARS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (likely(reader.has_remainder())) {
|
parser.n_structural_indexes = uint32_t(indexer.tail - parser.structural_indexes.get());
|
||||||
uint8_t block[STEP_SIZE];
|
/***
|
||||||
reader.get_remainder(block);
|
* This is related to https://github.com/simdjson/simdjson/issues/906
|
||||||
indexer.step<STEP_SIZE>(block, reader);
|
* Basically, we want to make sure that if the parsing continues beyond the last (valid)
|
||||||
|
* structural character, it quickly stops.
|
||||||
|
* Only three structural characters can be repeated without triggering an error in JSON: [,] and }.
|
||||||
|
* We repeat the padding character (at 'len'). We don't know what it is, but if the parsing
|
||||||
|
* continues, then it must be [,] or }.
|
||||||
|
* Suppose it is ] or }. We backtrack to the first character, what could it be that would
|
||||||
|
* not trigger an error? It could be ] or } but no, because you can't start a document that way.
|
||||||
|
* It can't be a comma, a colon or any simple value. So the only way we could continue is
|
||||||
|
* if the repeated character is [. But if so, the document must start with [. But if the document
|
||||||
|
* starts with [, it should end with ]. If we enforce that rule, then we would get
|
||||||
|
* ][[ which is invalid.
|
||||||
|
**/
|
||||||
|
parser.structural_indexes[parser.n_structural_indexes] = uint32_t(len);
|
||||||
|
parser.structural_indexes[parser.n_structural_indexes + 1] = uint32_t(len);
|
||||||
|
parser.structural_indexes[parser.n_structural_indexes + 2] = 0;
|
||||||
|
parser.next_structural_index = 0;
|
||||||
|
// a valid JSON file cannot have zero structural indexes - we should have found something
|
||||||
|
if (unlikely(parser.n_structural_indexes == 0u)) {
|
||||||
|
return EMPTY;
|
||||||
}
|
}
|
||||||
|
if (unlikely(parser.structural_indexes[parser.n_structural_indexes - 1] > len)) {
|
||||||
return indexer.finish(parser, reader.block_index(), len, streaming);
|
return UNEXPECTED_ERROR;
|
||||||
|
}
|
||||||
|
if (partial) {
|
||||||
|
auto new_structural_indexes = find_next_document_index(parser);
|
||||||
|
if (new_structural_indexes == 0 && parser.n_structural_indexes > 0) {
|
||||||
|
return CAPACITY; // If the buffer is partial but the document is incomplete, it's too big to parse.
|
||||||
|
}
|
||||||
|
parser.n_structural_indexes = new_structural_indexes;
|
||||||
|
}
|
||||||
|
return checker.errors();
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace stage1
|
} // namespace stage1
|
||||||
|
|
|
@ -56,7 +56,7 @@ namespace logger {
|
||||||
}
|
}
|
||||||
printf("| %c ", printable_char(structurals.at_beginning() ? ' ' : structurals.current_char()));
|
printf("| %c ", printable_char(structurals.at_beginning() ? ' ' : structurals.current_char()));
|
||||||
printf("| %c ", printable_char(structurals.peek_char()));
|
printf("| %c ", printable_char(structurals.peek_char()));
|
||||||
printf("| %5zd ", structurals.next_structural);
|
printf("| %5u ", structurals.structural_indexes[structurals.next_structural]);
|
||||||
printf("| %-*s ", LOG_DETAIL_LEN, detail);
|
printf("| %-*s ", LOG_DETAIL_LEN, detail);
|
||||||
printf("| %*zu ", LOG_INDEX_LEN, structurals.idx);
|
printf("| %*zu ", LOG_INDEX_LEN, structurals.idx);
|
||||||
printf("|\n");
|
printf("|\n");
|
||||||
|
|
|
@ -1,12 +1,18 @@
|
||||||
namespace stage2 {
|
namespace stage2 {
|
||||||
|
|
||||||
struct streaming_structural_parser: structural_parser {
|
struct streaming_structural_parser: structural_parser {
|
||||||
really_inline streaming_structural_parser(dom_parser_implementation &_parser, uint32_t next_structural) : structural_parser(_parser, next_structural) {}
|
really_inline streaming_structural_parser(dom_parser_implementation &_parser) : structural_parser(_parser, _parser.next_structural_index) {}
|
||||||
|
|
||||||
// override to add streaming
|
// override to add streaming
|
||||||
WARN_UNUSED really_inline error_code start(ret_address_t finish_parser) {
|
WARN_UNUSED really_inline error_code start(ret_address_t finish_parser) {
|
||||||
|
// If there are no structurals left, return EMPTY
|
||||||
|
if (structurals.at_end(parser.n_structural_indexes)) {
|
||||||
|
return parser.error = EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
log_start();
|
log_start();
|
||||||
init(); // sets is_valid to false
|
init();
|
||||||
|
|
||||||
// Capacity ain't no thang for streaming, so we don't check it.
|
// Capacity ain't no thang for streaming, so we don't check it.
|
||||||
// Advance to the first character as soon as possible
|
// Advance to the first character as soon as possible
|
||||||
advance_char();
|
advance_char();
|
||||||
|
@ -24,6 +30,7 @@ struct streaming_structural_parser: structural_parser {
|
||||||
return parser.error = TAPE_ERROR;
|
return parser.error = TAPE_ERROR;
|
||||||
}
|
}
|
||||||
end_document();
|
end_document();
|
||||||
|
parser.next_structural_index = uint32_t(structurals.next_structural_index());
|
||||||
if (depth != 0) {
|
if (depth != 0) {
|
||||||
log_error("Unclosed objects or arrays!");
|
log_error("Unclosed objects or arrays!");
|
||||||
return parser.error = TAPE_ERROR;
|
return parser.error = TAPE_ERROR;
|
||||||
|
@ -32,9 +39,7 @@ struct streaming_structural_parser: structural_parser {
|
||||||
log_error("IMPOSSIBLE: root scope tape index did not start at 0!");
|
log_error("IMPOSSIBLE: root scope tape index did not start at 0!");
|
||||||
return parser.error = TAPE_ERROR;
|
return parser.error = TAPE_ERROR;
|
||||||
}
|
}
|
||||||
bool finished = structurals.at_end(parser.n_structural_indexes);
|
return SUCCESS;
|
||||||
if (!finished) { log_value("(and has more)"); }
|
|
||||||
return finished ? SUCCESS : SUCCESS_AND_HAS_MORE;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -44,12 +49,10 @@ struct streaming_structural_parser: structural_parser {
|
||||||
* The JSON is parsed to a tape, see the accompanying tape.md file
|
* The JSON is parsed to a tape, see the accompanying tape.md file
|
||||||
* for documentation.
|
* for documentation.
|
||||||
***********/
|
***********/
|
||||||
WARN_UNUSED error_code dom_parser_implementation::stage2(const uint8_t *_buf, size_t _len, dom::document &_doc, size_t &next_json) noexcept {
|
WARN_UNUSED error_code dom_parser_implementation::stage2_next(dom::document &_doc) noexcept {
|
||||||
this->buf = _buf;
|
|
||||||
this->len = _len;
|
|
||||||
this->doc = &_doc;
|
this->doc = &_doc;
|
||||||
static constexpr stage2::unified_machine_addresses addresses = INIT_ADDRESSES();
|
static constexpr stage2::unified_machine_addresses addresses = INIT_ADDRESSES();
|
||||||
stage2::streaming_structural_parser parser(*this, uint32_t(next_json));
|
stage2::streaming_structural_parser parser(*this);
|
||||||
error_code result = parser.start(addresses.finish);
|
error_code result = parser.start(addresses.finish);
|
||||||
if (result) { return result; }
|
if (result) { return result; }
|
||||||
//
|
//
|
||||||
|
@ -158,7 +161,6 @@ array_continue:
|
||||||
}
|
}
|
||||||
|
|
||||||
finish:
|
finish:
|
||||||
next_json = parser.structurals.next_structural_index();
|
|
||||||
return parser.finish();
|
return parser.finish();
|
||||||
|
|
||||||
error:
|
error:
|
||||||
|
|
|
@ -52,10 +52,10 @@ public:
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
really_inline bool past_end(uint32_t n_structural_indexes) {
|
really_inline bool past_end(uint32_t n_structural_indexes) {
|
||||||
return next_structural+1 > n_structural_indexes;
|
return next_structural > n_structural_indexes;
|
||||||
}
|
}
|
||||||
really_inline bool at_end(uint32_t n_structural_indexes) {
|
really_inline bool at_end(uint32_t n_structural_indexes) {
|
||||||
return next_structural+1 == n_structural_indexes;
|
return next_structural == n_structural_indexes;
|
||||||
}
|
}
|
||||||
really_inline bool at_beginning() {
|
really_inline bool at_beginning() {
|
||||||
return next_structural == 0;
|
return next_structural == 0;
|
||||||
|
|
|
@ -75,10 +75,7 @@ struct structural_parser {
|
||||||
uint8_t *current_string_buf_loc{};
|
uint8_t *current_string_buf_loc{};
|
||||||
uint32_t depth;
|
uint32_t depth;
|
||||||
|
|
||||||
really_inline structural_parser(
|
really_inline structural_parser(dom_parser_implementation &_parser, uint32_t next_structural = 0) : structurals(_parser.buf, _parser.len, _parser.structural_indexes.get(), next_structural), parser{_parser}, depth{0} {}
|
||||||
dom_parser_implementation &_parser,
|
|
||||||
uint32_t next_structural = 0
|
|
||||||
) : structurals(_parser.buf, _parser.len, _parser.structural_indexes.get(), next_structural), parser{_parser}, depth{0} {}
|
|
||||||
|
|
||||||
WARN_UNUSED really_inline bool start_scope(ret_address_t continue_state) {
|
WARN_UNUSED really_inline bool start_scope(ret_address_t continue_state) {
|
||||||
parser.containing_scope[depth].tape_index = parser.current_loc;
|
parser.containing_scope[depth].tape_index = parser.current_loc;
|
||||||
|
@ -333,7 +330,7 @@ struct structural_parser {
|
||||||
|
|
||||||
WARN_UNUSED really_inline error_code start(size_t len, ret_address_t finish_state) {
|
WARN_UNUSED really_inline error_code start(size_t len, ret_address_t finish_state) {
|
||||||
log_start();
|
log_start();
|
||||||
init(); // sets is_valid to false
|
init();
|
||||||
if (len > parser.capacity()) {
|
if (len > parser.capacity()) {
|
||||||
return parser.error = CAPACITY;
|
return parser.error = CAPACITY;
|
||||||
}
|
}
|
||||||
|
@ -401,7 +398,7 @@ WARN_UNUSED error_code dom_parser_implementation::stage2(dom::document &_doc) no
|
||||||
FAIL_IF( parser.start_array(addresses.finish) );
|
FAIL_IF( parser.start_array(addresses.finish) );
|
||||||
// Make sure the outer array is closed before continuing; otherwise, there are ways we could get
|
// Make sure the outer array is closed before continuing; otherwise, there are ways we could get
|
||||||
// into memory corruption. See https://github.com/simdjson/simdjson/issues/906
|
// into memory corruption. See https://github.com/simdjson/simdjson/issues/906
|
||||||
if (buf[structural_indexes[n_structural_indexes - 2]] != ']') {
|
if (buf[structural_indexes[n_structural_indexes - 1]] != ']') {
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
goto array_begin;
|
goto array_begin;
|
||||||
|
|
|
@ -70,6 +70,7 @@ WARN_UNUSED error_code implementation::minify(const uint8_t *buf, size_t len, ui
|
||||||
return haswell::stage1::json_minifier::minify<128>(buf, len, dst, dst_len);
|
return haswell::stage1::json_minifier::minify<128>(buf, len, dst, dst_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#include "generic/stage1/find_next_document_index.h"
|
||||||
#include "generic/stage1/utf8_lookup2_algorithm.h"
|
#include "generic/stage1/utf8_lookup2_algorithm.h"
|
||||||
#include "generic/stage1/json_structural_indexer.h"
|
#include "generic/stage1/json_structural_indexer.h"
|
||||||
WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept {
|
WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept {
|
||||||
|
|
|
@ -71,6 +71,7 @@ WARN_UNUSED error_code implementation::minify(const uint8_t *buf, size_t len, ui
|
||||||
return westmere::stage1::json_minifier::minify<64>(buf, len, dst, dst_len);
|
return westmere::stage1::json_minifier::minify<64>(buf, len, dst, dst_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#include "generic/stage1/find_next_document_index.h"
|
||||||
#include "generic/stage1/utf8_lookup2_algorithm.h"
|
#include "generic/stage1/utf8_lookup2_algorithm.h"
|
||||||
#include "generic/stage1/json_structural_indexer.h"
|
#include "generic/stage1/json_structural_indexer.h"
|
||||||
WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept {
|
WARN_UNUSED error_code dom_parser_implementation::stage1(const uint8_t *_buf, size_t _len, bool streaming) noexcept {
|
||||||
|
|
|
@ -515,9 +515,9 @@ namespace parse_api_tests {
|
||||||
using namespace simdjson;
|
using namespace simdjson;
|
||||||
using namespace simdjson::dom;
|
using namespace simdjson::dom;
|
||||||
|
|
||||||
const padded_string BASIC_JSON = string("[1,2,3]");
|
const padded_string BASIC_JSON = "[1,2,3]"_padded;
|
||||||
const padded_string BASIC_NDJSON = string("[1,2,3]\n[4,5,6]");
|
const padded_string BASIC_NDJSON = "[1,2,3]\n[4,5,6]"_padded;
|
||||||
// const padded_string EMPTY_NDJSON = string("");
|
const padded_string EMPTY_NDJSON = ""_padded;
|
||||||
|
|
||||||
bool parser_parse() {
|
bool parser_parse() {
|
||||||
std::cout << "Running " << __func__ << std::endl;
|
std::cout << "Running " << __func__ << std::endl;
|
||||||
|
@ -532,24 +532,45 @@ namespace parse_api_tests {
|
||||||
dom::parser parser;
|
dom::parser parser;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (auto [doc, error] : parser.parse_many(BASIC_NDJSON)) {
|
for (auto [doc, error] : parser.parse_many(BASIC_NDJSON)) {
|
||||||
if (error) { cerr << error << endl; return false; }
|
if (error) { cerr << "Error in parse_many: " << endl; return false; }
|
||||||
if (!doc.is<dom::array>()) { cerr << "Document did not parse as an array" << endl; return false; }
|
if (!doc.is<dom::array>()) { cerr << "Document did not parse as an array" << endl; return false; }
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
if (count != 2) { cerr << "parse_many returned " << count << " documents, expected 2" << endl; return false; }
|
if (count != 2) { cerr << "parse_many returned " << count << " documents, expected 2" << endl; return false; }
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// bool parser_parse_many_empty() {
|
bool parser_parse_many_empty() {
|
||||||
// std::cout << "Running " << __func__ << std::endl;
|
std::cout << "Running " << __func__ << std::endl;
|
||||||
// dom::parser parser;
|
dom::parser parser;
|
||||||
// int count = 0;
|
int count = 0;
|
||||||
// for (auto [doc, error] : parser.parse_many(EMPTY_NDJSON)) {
|
for (auto doc : parser.parse_many(EMPTY_NDJSON)) {
|
||||||
// if (error) { cerr << error << endl; return false; }
|
if (doc.error()) { cerr << "Error in parse_many: " << doc.error() << endl; return false; }
|
||||||
// count++;
|
count++;
|
||||||
// }
|
}
|
||||||
// if (count != 0) { cerr << "parse_many returned " << count << " documents, expected 0" << endl; return false; }
|
if (count != 0) { cerr << "parse_many returned " << count << " documents, expected 0" << endl; return false; }
|
||||||
// return true;
|
return true;
|
||||||
// }
|
}
|
||||||
|
|
||||||
|
bool parser_parse_many_empty_batches() {
|
||||||
|
std::cout << "Running " << __func__ << std::endl;
|
||||||
|
dom::parser parser;
|
||||||
|
uint64_t count = 0;
|
||||||
|
constexpr const int BATCH_SIZE = 128;
|
||||||
|
uint8_t empty_batches_ndjson[BATCH_SIZE*16+SIMDJSON_PADDING];
|
||||||
|
memset(&empty_batches_ndjson[0], ' ', BATCH_SIZE*16+SIMDJSON_PADDING);
|
||||||
|
memcpy(&empty_batches_ndjson[BATCH_SIZE*3+2], "1", 1);
|
||||||
|
memcpy(&empty_batches_ndjson[BATCH_SIZE*10+4], "2", 1);
|
||||||
|
memcpy(&empty_batches_ndjson[BATCH_SIZE*11+6], "3", 1);
|
||||||
|
for (auto [doc, error] : parser.parse_many(empty_batches_ndjson, BATCH_SIZE*16)) {
|
||||||
|
if (error) { cerr << "Error in parse_many: " << error << endl; return false; }
|
||||||
|
count++;
|
||||||
|
auto [val, val_error] = doc.get<uint64_t>();
|
||||||
|
if (val_error) { cerr << "Document is not an unsigned int: " << val_error << endl; return false; }
|
||||||
|
if (val != count) { cerr << "Expected document #" << count << " to equal " << count << ", but got " << val << " instead!" << endl; return false; }
|
||||||
|
}
|
||||||
|
if (count != 3) { cerr << "parse_many returned " << count << " documents, expected 0" << endl; return false; }
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
bool parser_load() {
|
bool parser_load() {
|
||||||
std::cout << "Running " << __func__ << " on " << TWITTER_JSON << std::endl;
|
std::cout << "Running " << __func__ << " on " << TWITTER_JSON << std::endl;
|
||||||
|
@ -633,7 +654,8 @@ namespace parse_api_tests {
|
||||||
bool run() {
|
bool run() {
|
||||||
return parser_parse() &&
|
return parser_parse() &&
|
||||||
parser_parse_many() &&
|
parser_parse_many() &&
|
||||||
// parser_parse_many_empty() &&
|
parser_parse_many_empty() &&
|
||||||
|
parser_parse_many_empty_batches() &&
|
||||||
parser_load() &&
|
parser_load() &&
|
||||||
parser_load_many() &&
|
parser_load_many() &&
|
||||||
#if SIMDJSON_EXCEPTIONS
|
#if SIMDJSON_EXCEPTIONS
|
||||||
|
|
|
@ -24,7 +24,7 @@ const char *TWITTER_JSON = SIMDJSON_BENCHMARK_DATA_DIR "twitter.json";
|
||||||
#define TEST_FAIL(MESSAGE) { cerr << "FAIL: " << (MESSAGE) << endl; return false; }
|
#define TEST_FAIL(MESSAGE) { cerr << "FAIL: " << (MESSAGE) << endl; return false; }
|
||||||
#define TEST_SUCCEED() { return true; }
|
#define TEST_SUCCEED() { return true; }
|
||||||
namespace parser_load {
|
namespace parser_load {
|
||||||
const char * NONEXISTENT_FILE = "this_file_does_not_exit.json";
|
const char * NONEXISTENT_FILE = "this_file_does_not_exist.json";
|
||||||
bool parser_load_capacity() {
|
bool parser_load_capacity() {
|
||||||
TEST_START();
|
TEST_START();
|
||||||
dom::parser parser(1); // 1 byte max capacity
|
dom::parser parser(1); // 1 byte max capacity
|
||||||
|
@ -42,6 +42,57 @@ namespace parser_load {
|
||||||
TEST_FAIL("No documents returned");
|
TEST_FAIL("No documents returned");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool parser_parse_many_documents_error_in_the_middle() {
|
||||||
|
TEST_START();
|
||||||
|
const padded_string DOC = "1 2 [} 3"_padded;
|
||||||
|
size_t count = 0;
|
||||||
|
dom::parser parser;
|
||||||
|
for (auto doc : parser.parse_many(DOC)) {
|
||||||
|
count++;
|
||||||
|
auto [val, error] = doc.get<uint64_t>();
|
||||||
|
if (count == 3) {
|
||||||
|
ASSERT_ERROR(error, TAPE_ERROR);
|
||||||
|
} else {
|
||||||
|
if (error) { TEST_FAIL(error); }
|
||||||
|
if (val != count) { cerr << "FAIL: expected " << count << ", got " << val << endl; return false; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (count != 3) { cerr << "FAIL: expected 2 documents and 1 error, got " << count << " total things" << endl; return false; }
|
||||||
|
TEST_SUCCEED();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool parser_parse_many_documents_partial() {
|
||||||
|
TEST_START();
|
||||||
|
const padded_string DOC = "["_padded;
|
||||||
|
size_t count = 0;
|
||||||
|
dom::parser parser;
|
||||||
|
for (auto doc : parser.parse_many(DOC)) {
|
||||||
|
count++;
|
||||||
|
ASSERT_ERROR(doc.error(), TAPE_ERROR);
|
||||||
|
}
|
||||||
|
if (count != 1) { cerr << "FAIL: expected no documents and 1 error, got " << count << " total things" << endl; return false; }
|
||||||
|
TEST_SUCCEED();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool parser_parse_many_documents_partial_at_the_end() {
|
||||||
|
TEST_START();
|
||||||
|
const padded_string DOC = "1 2 ["_padded;
|
||||||
|
size_t count = 0;
|
||||||
|
dom::parser parser;
|
||||||
|
for (auto doc : parser.parse_many(DOC)) {
|
||||||
|
count++;
|
||||||
|
auto [val, error] = doc.get<uint64_t>();
|
||||||
|
if (count == 3) {
|
||||||
|
ASSERT_ERROR(error, TAPE_ERROR);
|
||||||
|
} else {
|
||||||
|
if (error) { TEST_FAIL(error); }
|
||||||
|
if (val != count) { cerr << "FAIL: expected " << count << ", got " << val << endl; return false; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (count != 3) { cerr << "FAIL: expected 2 documents and 1 error, got " << count << " total things" << endl; return false; }
|
||||||
|
TEST_SUCCEED();
|
||||||
|
}
|
||||||
|
|
||||||
bool parser_load_nonexistent() {
|
bool parser_load_nonexistent() {
|
||||||
TEST_START();
|
TEST_START();
|
||||||
dom::parser parser;
|
dom::parser parser;
|
||||||
|
@ -83,9 +134,18 @@ namespace parser_load {
|
||||||
TEST_FAIL("No documents returned");
|
TEST_FAIL("No documents returned");
|
||||||
}
|
}
|
||||||
bool run() {
|
bool run() {
|
||||||
return parser_load_capacity() && parser_load_many_capacity()
|
return true
|
||||||
&& parser_load_nonexistent() && parser_load_many_nonexistent() && padded_string_load_nonexistent()
|
&& parser_load_capacity()
|
||||||
&& parser_load_chain() && parser_load_many_chain();
|
&& parser_load_many_capacity()
|
||||||
|
&& parser_load_nonexistent()
|
||||||
|
&& parser_load_many_nonexistent()
|
||||||
|
&& padded_string_load_nonexistent()
|
||||||
|
&& parser_load_chain()
|
||||||
|
&& parser_load_many_chain()
|
||||||
|
&& parser_parse_many_documents_error_in_the_middle()
|
||||||
|
&& parser_parse_many_documents_partial()
|
||||||
|
&& parser_parse_many_documents_partial_at_the_end()
|
||||||
|
;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -83,17 +83,21 @@ bool validate(const char *dirname) {
|
||||||
if (contains("EXCLUDE", name)) {
|
if (contains("EXCLUDE", name)) {
|
||||||
// skipping
|
// skipping
|
||||||
how_many--;
|
how_many--;
|
||||||
} else if (starts_with("pass", name) and (has_extension(extension1, name) or has_extension(extension2, name)) and error) {
|
} else if (starts_with("pass", name) or starts_with("fail10.json", name) or starts_with("fail70.json", name)) {
|
||||||
is_file_as_expected[i] = false;
|
if (error) {
|
||||||
printf("warning: file %s should pass but it fails. Error is: %s\n",
|
is_file_as_expected[i] = false;
|
||||||
name, error_message(error));
|
printf("warning: file %s should pass but it fails. Error is: %s\n",
|
||||||
printf("size of file in bytes: %zu \n", json.size());
|
name, error_message(error));
|
||||||
everything_fine = false;
|
printf("size of file in bytes: %zu \n", json.size());
|
||||||
} else if ( starts_with("fail", name) and (not starts_with("fail10.json", name)) and !error) {
|
everything_fine = false;
|
||||||
is_file_as_expected[i] = false;
|
}
|
||||||
printf("warning: file %s should fail but it passes.\n", name);
|
} else if ( starts_with("fail", name) ) {
|
||||||
printf("size of file in bytes: %zu \n", json.size());
|
if (!error) {
|
||||||
everything_fine = false;
|
is_file_as_expected[i] = false;
|
||||||
|
printf("warning: file %s should fail but it passes.\n", name);
|
||||||
|
printf("size of file in bytes: %zu \n", json.size());
|
||||||
|
everything_fine = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
free(fullpath);
|
free(fullpath);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue