Move document stream state to implementation

This commit is contained in:
John Keiser 2020-06-02 20:21:46 -07:00
parent 8c16ba372e
commit ef63a84a3e
12 changed files with 331 additions and 336 deletions

View File

@ -43,7 +43,7 @@ public:
really_inline bool operator!=(const iterator &other) const noexcept;
private:
iterator(document_stream& stream, bool finished) noexcept;
really_inline iterator(document_stream &s, bool finished) noexcept;
/** The document_stream we're iterating through. */
document_stream& stream;
/** Whether we're finished or not. */
@ -66,7 +66,23 @@ private:
document_stream(document_stream &other) = delete; // Disallow copying
really_inline document_stream(dom::parser &parser, const uint8_t *buf, size_t len, size_t batch_size, error_code error = SUCCESS) noexcept;
/**
* Construct a document_stream. Does not allocate or parse anything until the iterator is
* used.
*/
really_inline document_stream(
dom::parser &parser,
const uint8_t *buf,
size_t len,
size_t batch_size,
error_code error = SUCCESS
) noexcept;
/**
* Parse the first document in the buffer. Used by begin(), to handle allocation and
* initialization.
*/
inline void start() noexcept;
/**
* Parse the next document found in the buffer previously given to document_stream.
@ -79,10 +95,7 @@ private:
* pre-allocating a capacity defined by the batch_size defined when creating the
* document_stream object.
*
* The function returns simdjson::SUCCESS_AND_HAS_MORE (an integer = 1) in case
* of success and indicates that the buffer still contains more data to be parsed,
* meaning this function can be called again to return the next JSON document
* after this one.
* The function returns simdjson::EMPTY if there is no more data to be parsed.
*
* The function returns simdjson::SUCCESS (as integer = 0) in case of success
* and indicates that the buffer has successfully been parsed to the end.
@ -93,55 +106,52 @@ private:
* the simdjson::error_message function converts these error codes into a string).
*
* You can also check validity by calling parser.is_valid(). The same parser can
* and should be reused for the other documents in the buffer. */
inline error_code json_parse() noexcept;
* and should be reused for the other documents in the buffer.
*/
inline void next() noexcept;
/**
* Returns the location (index) of where the next document should be in the
* buffer.
* Can be used for debugging, it tells the user the position of the end of the
* last
* valid JSON document parsed
* Pass the next batch through stage 1 and return when finished.
* When threads are enabled, this may wait for the stage 1 thread to finish.
*/
inline size_t get_current_buffer_loc() const { return current_buffer_loc; }
inline void load_batch() noexcept;
/**
* Returns the total amount of complete documents parsed by the document_stream,
* in the current buffer, at the given time.
*/
inline size_t get_n_parsed_docs() const { return n_parsed_docs; }
/** Get the next document index. */
inline size_t next_batch_start() const noexcept;
/**
* Returns the total amount of data (in bytes) parsed by the document_stream,
* in the current buffer, at the given time.
*/
inline size_t get_n_bytes_parsed() const { return n_bytes_parsed; }
inline const uint8_t *buf() const { return _buf + buf_start; }
inline void advance(size_t offset) { buf_start += offset; }
inline size_t remaining() const { return _len - buf_start; }
/** Pass the next batch through stage 1 with the given parser. */
inline void run_stage1(dom::parser &p, size_t batch_start) noexcept;
dom::parser &parser;
const uint8_t *_buf;
const size_t _len;
size_t _batch_size; // this is actually variable!
size_t buf_start{0};
size_t next_json{0};
bool load_next_batch{true};
size_t current_buffer_loc{0};
const uint8_t *buf;
const size_t len;
const size_t batch_size;
size_t batch_start{0};
/** The error (or lack thereof) from the current document. */
error_code error;
#ifdef SIMDJSON_THREADS_ENABLED
size_t last_json_buffer_loc{0};
#endif
size_t n_parsed_docs{0};
size_t n_bytes_parsed{0};
error_code error{SUCCESS_AND_HAS_MORE};
#ifdef SIMDJSON_THREADS_ENABLED
error_code stage1_is_ok_thread{SUCCESS};
std::thread stage_1_thread{};
dom::parser parser_thread{};
#endif
/**
* Start a thread to run stage 1 on the next batch.
*/
inline void start_stage1_thread() noexcept;
/**
* Wait for the stage 1 thread to finish and capture the results.
*/
inline void finish_stage1_thread() noexcept;
/** The error returned from the stage 1 thread. */
error_code stage1_thread_error{UNINITIALIZED};
/** The thread used to run stage 1 against the next batch in the background. */
std::thread stage1_thread{};
/**
* The parser used to run stage 1 in the background. Will be swapped
* with the regular parser when finished.
*/
dom::parser stage1_thread_parser{};
#endif // SIMDJSON_THREADS_ENABLED
friend class dom::parser;
}; // class document_stream

View File

@ -6,125 +6,37 @@
#include <limits>
#include <stdexcept>
namespace simdjson {
namespace internal {
/**
* This algorithm is used to quickly identify the buffer position of
* the last JSON document inside the current batch.
*
* It does its work by finding the last pair of structural characters
* that represent the end followed by the start of a document.
*
* Simply put, we iterate over the structural characters, starting from
* the end. We consider that we found the end of a JSON document when the
* first element of the pair is NOT one of these characters: '{' '[' ';' ','
* and when the second element is NOT one of these characters: '}' '}' ';' ','.
*
* This simple comparison works most of the time, but it does not cover cases
* where the batch's structural indexes contain a perfect amount of documents.
* In such a case, we do not have access to the structural index which follows
* the last document, therefore, we do not have access to the second element in
* the pair, and means that we cannot identify the last document. To fix this
* issue, we keep a count of the open and closed curly/square braces we found
* while searching for the pair. When we find a pair AND the count of open and
* closed curly/square braces is the same, we know that we just passed a
* complete
* document, therefore the last json buffer location is the end of the batch
* */
inline uint32_t find_last_json_buf_idx(const uint8_t *buf, size_t size, const dom::parser &parser) {
// this function can be generally useful
if (parser.implementation->n_structural_indexes == 0)
return 0;
auto last_i = parser.implementation->n_structural_indexes - 1;
if (parser.implementation->structural_indexes[last_i] == size) {
if (last_i == 0)
return 0;
last_i = parser.implementation->n_structural_indexes - 2;
}
auto arr_cnt = 0;
auto obj_cnt = 0;
for (auto i = last_i; i > 0; i--) {
auto idxb = parser.implementation->structural_indexes[i];
switch (buf[idxb]) {
case ':':
case ',':
continue;
case '}':
obj_cnt--;
continue;
case ']':
arr_cnt--;
continue;
case '{':
obj_cnt++;
break;
case '[':
arr_cnt++;
break;
}
auto idxa = parser.implementation->structural_indexes[i - 1];
switch (buf[idxa]) {
case '{':
case '[':
case ':':
case ',':
continue;
}
if (!arr_cnt && !obj_cnt) {
return last_i + 1;
}
return i;
}
return 0;
}
// returns true if the provided byte value is an ASCII character
static inline bool is_ascii(char c) {
return ((unsigned char)c) <= 127;
}
// if the string ends with UTF-8 values, backtrack
// up to the first ASCII character. May return 0.
static inline size_t trimmed_length_safe_utf8(const char * c, size_t len) {
while ((len > 0) and (not is_ascii(c[len - 1]))) {
len--;
}
return len;
}
} // namespace internal
} // namespace simdjson
namespace simdjson {
namespace dom {
really_inline document_stream::document_stream(
dom::parser &_parser,
const uint8_t *buf,
size_t len,
size_t batch_size,
const uint8_t *_buf,
size_t _len,
size_t _batch_size,
error_code _error
) noexcept
: parser{_parser},
_buf{buf},
_len{len},
_batch_size(batch_size),
error(_error)
buf{_buf},
len{_len},
batch_size{_batch_size},
error{_error}
{
if (!error) { error = json_parse(); }
}
inline document_stream::~document_stream() noexcept {
#ifdef SIMDJSON_THREADS_ENABLED
if (stage_1_thread.joinable()) {
stage_1_thread.join();
// TODO kill the thread, why should people have to wait for a non-side-effecting operation to complete
if (stage1_thread.joinable()) {
stage1_thread.join();
}
#endif
}
really_inline document_stream::iterator document_stream::begin() noexcept {
return iterator(*this, false);
start();
// If there are no documents, we're finished.
return iterator(*this, error == EMPTY);
}
really_inline document_stream::iterator document_stream::end() noexcept {
@ -136,17 +48,15 @@ really_inline document_stream::iterator::iterator(document_stream& _stream, bool
}
really_inline simdjson_result<element> document_stream::iterator::operator*() noexcept {
error_code err = stream.error == SUCCESS_AND_HAS_MORE ? SUCCESS : stream.error;
if (err) { return err; }
// Once we have yielded any errors, we're finished.
if (stream.error) { finished = true; return stream.error; }
return stream.parser.doc.root();
}
really_inline document_stream::iterator& document_stream::iterator::operator++() noexcept {
if (stream.error == SUCCESS_AND_HAS_MORE) {
stream.error = stream.json_parse();
} else {
finished = true;
}
stream.next();
// If that was the last document, we're finished.
if (stream.error == EMPTY) { finished = true; }
return *this;
}
@ -154,130 +64,99 @@ really_inline bool document_stream::iterator::operator!=(const document_stream::
return finished != other.finished;
}
inline void document_stream::start() noexcept {
if (error) { return; }
error = parser.ensure_capacity(batch_size);
if (error) { return; }
// Always run the first stage 1 parse immediately
batch_start = 0;
run_stage1(parser, batch_start);
if (error) { return; }
#ifdef SIMDJSON_THREADS_ENABLED
if (next_batch_start() < len) {
// Kick off the first thread if needed
error = thread_parser.ensure_capacity(batch_size);
if (error) { return; }
start_stage1_thread();
if (error) { return; }
}
#endif // SIMDJSON_THREADS_ENABLED
next();
}
inline void document_stream::next() noexcept {
if (error) { return; }
// Load the next document from the batch
error = parser.implementation->stage2_next(parser.doc);
// If that was the last document in the batch, load another batch (if available)
while (error == EMPTY) {
batch_start = next_batch_start();
if (batch_start >= len) { break; }
#ifdef SIMDJSON_THREADS_ENABLED
load_from_stage1_thread();
#else
run_stage1(parser, batch_start);
#endif
if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
// Run stage 2 on the first document in the batch
error = parser.implementation->stage2_next(parser.doc);
}
}
inline size_t document_stream::next_batch_start() const noexcept {
return batch_start + parser.implementation->structural_indexes[parser.implementation->n_structural_indexes];
}
inline void document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept {
// If this is the final batch, pass partial = false
size_t remaining = len - _batch_start;
if (remaining <= batch_size) {
error = p.implementation->stage1(&buf[_batch_start], remaining, false);
} else {
error = p.implementation->stage1(&buf[_batch_start], batch_size, true);
}
}
#ifdef SIMDJSON_THREADS_ENABLED
// threaded version of json_parse
// todo: simplify this code further
inline error_code document_stream::json_parse() noexcept {
error = parser.ensure_capacity(_batch_size);
if (error) { return error; }
error = parser_thread.ensure_capacity(_batch_size);
if (error) { return error; }
inline void document_stream::load_from_stage1_thread() noexcept {
stage1_thread.join();
if (unlikely(load_next_batch)) {
// First time loading
if (!stage_1_thread.joinable()) {
_batch_size = (std::min)(_batch_size, remaining());
_batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size);
if (_batch_size == 0) {
return simdjson::UTF8_ERROR;
}
auto stage1_is_ok = error_code(parser.implementation->stage1(buf(), _batch_size, true));
if (stage1_is_ok != simdjson::SUCCESS) {
return stage1_is_ok;
}
uint32_t last_index = internal::find_last_json_buf_idx(buf(), _batch_size, parser);
if (last_index == 0) {
if (parser.implementation->n_structural_indexes == 0) {
return simdjson::EMPTY;
}
} else {
parser.implementation->n_structural_indexes = last_index + 1;
}
}
// the second thread is running or done.
else {
stage_1_thread.join();
if (stage1_is_ok_thread != simdjson::SUCCESS) {
return stage1_is_ok_thread;
}
std::swap(parser.implementation->structural_indexes, parser_thread.implementation->structural_indexes);
parser.implementation->n_structural_indexes = parser_thread.implementation->n_structural_indexes;
advance(last_json_buffer_loc);
n_bytes_parsed += last_json_buffer_loc;
}
// let us decide whether we will start a new thread
if (remaining() - _batch_size > 0) {
last_json_buffer_loc =
parser.implementation->structural_indexes[internal::find_last_json_buf_idx(buf(), _batch_size, parser)];
_batch_size = (std::min)(_batch_size, remaining() - last_json_buffer_loc);
if (_batch_size > 0) {
_batch_size = internal::trimmed_length_safe_utf8(
(const char *)(buf() + last_json_buffer_loc), _batch_size);
if (_batch_size == 0) {
return simdjson::UTF8_ERROR;
}
// let us capture read-only variables
const uint8_t *const b = buf() + last_json_buffer_loc;
const size_t bs = _batch_size;
// we call the thread on a lambda that will update
// this->stage1_is_ok_thread
// there is only one thread that may write to this value
stage_1_thread = std::thread([this, b, bs] {
this->stage1_is_ok_thread = error_code(parser_thread.implementation->stage1(b, bs, true));
});
}
}
next_json = 0;
load_next_batch = false;
} // load_next_batch
error_code res = parser.implementation->stage2(buf(), remaining(), parser.doc, next_json);
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
n_parsed_docs++;
current_buffer_loc = parser.implementation->structural_indexes[next_json];
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if (remaining() > _batch_size) {
current_buffer_loc = parser.implementation->structural_indexes[next_json - 1];
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
// Swap to the parser that was loaded up in the thread. Make sure the parser has
// enough memory to swap to, as well.
error = parser.ensure_capacity(batch_size);
if (error) { return error; }
std::swap(parser, stage1_thread_parser);
if (stage1_thread_error) { return stage1_thread_error; }
// If there's anything left, start the stage 1 thread!
if (next_batch_start() < len) {
start_stage1_thread();
}
return res;
}
#else // SIMDJSON_THREADS_ENABLED
// single-threaded version of json_parse
inline error_code document_stream::json_parse() noexcept {
error = parser.ensure_capacity(_batch_size);
if (error) { return error; }
if (unlikely(load_next_batch)) {
advance(current_buffer_loc);
n_bytes_parsed += current_buffer_loc;
_batch_size = (std::min)(_batch_size, remaining());
_batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size);
auto stage1_is_ok = (error_code)parser.implementation->stage1(buf(), _batch_size, true);
if (stage1_is_ok != simdjson::SUCCESS) {
return stage1_is_ok;
inline void document_stream::start_stage1_thread() noexcept {
// we call the thread on a lambda that will update
// this->stage1_thread_error
// there is only one thread that may write to this value
// TODO this is NOT exception-safe.
this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
stage1_thread = std::thread([this] {
this->stage1_thread_error = this->thread_parser.ensure_capacity(this->batch_size);
if (!this->stage1_thread_error) {
this->stage1_thread_error = run_stage1(this->stage1_thread_parser, this->next_batch_start());
}
uint32_t last_index = internal::find_last_json_buf_idx(buf(), _batch_size, parser);
if (last_index == 0) {
if (parser.implementation->n_structural_indexes == 0) {
return EMPTY;
}
} else {
parser.implementation->n_structural_indexes = last_index + 1;
}
load_next_batch = false;
} // load_next_batch
error_code res = parser.implementation->stage2(buf(), remaining(), parser.doc, next_json);
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
n_parsed_docs++;
current_buffer_loc = parser.implementation->structural_indexes[next_json];
} else if (res == simdjson::SUCCESS) {
n_parsed_docs++;
if (remaining() > _batch_size) {
current_buffer_loc = parser.implementation->structural_indexes[next_json - 1];
next_json = 1;
load_next_batch = true;
res = simdjson::SUCCESS_AND_HAS_MORE;
}
}
return res;
});
}
#endif // SIMDJSON_THREADS_ENABLED
} // namespace dom

View File

@ -72,16 +72,14 @@ public:
*
* Stage 2 of the document parser for parser::parse_many.
*
* Guaranteed only to be called after stage1(), with buf and len being a subset of the total stage1 buf/len.
* Guaranteed only to be called after stage1().
* Overridden by each implementation.
*
* @param buf The json document to parse.
* @param len The length of the json document.
* @param doc The document to output to.
* @param next_json The next structural index. Start this at 0 the first time, and it will be updated to the next value to pass each time.
* @return The error code, SUCCESS if there was no error, or SUCCESS_AND_HAS_MORE if there was no error and stage2 can be called again.
*/
WARN_UNUSED virtual error_code stage2(const uint8_t *buf, size_t len, dom::document &doc, size_t &next_json) noexcept = 0;
WARN_UNUSED virtual error_code stage2_next(dom::document &doc) noexcept = 0;
/**
* Change the capacity of this parser.
@ -117,6 +115,8 @@ public:
uint32_t n_structural_indexes{0};
/** Structural indices passed from stage 1 to stage 2 */
std::unique_ptr<uint32_t[]> structural_indexes{};
/** Next structural index to parse */
uint32_t next_structural_index{0};
/**
* The largest document this parser can support without reallocating.

View File

@ -28,11 +28,12 @@ public:
really_inline dom_parser_implementation();
dom_parser_implementation(const dom_parser_implementation &) = delete;
dom_parser_implementation & operator=(const dom_parser_implementation &) = delete;
WARN_UNUSED error_code parse(const uint8_t *buf, size_t len, dom::document &doc) noexcept final;
WARN_UNUSED error_code stage1(const uint8_t *buf, size_t len, bool streaming) noexcept final;
WARN_UNUSED error_code stage1(const uint8_t *buf, size_t len, bool partial) noexcept final;
WARN_UNUSED error_code check_for_unclosed_array() noexcept;
WARN_UNUSED error_code stage2(dom::document &doc) noexcept final;
WARN_UNUSED error_code stage2(const uint8_t *buf, size_t len, dom::document &doc, size_t &next_json) noexcept final;
WARN_UNUSED error_code stage2_next(dom::document &doc) noexcept final;
WARN_UNUSED error_code set_capacity(size_t capacity) noexcept final;
WARN_UNUSED error_code set_max_depth(size_t max_depth) noexcept final;
};

View File

@ -8,6 +8,8 @@ really_inline error_code set_capacity(internal::dom_parser_implementation &parse
size_t max_structures = ROUNDUP_N(capacity, 64) + 2 + 7;
parser.structural_indexes.reset( new (std::nothrow) uint32_t[max_structures] );
if (!parser.structural_indexes) { return MEMALLOC; }
parser.structural_indexes[0] = 0;
parser.n_structural_indexes = 0;
return SUCCESS;
}

View File

@ -24,6 +24,38 @@ private:
size_t idx;
};
constexpr const int TITLE_SIZE = 12;
// Routines to print masks and text for debugging bitmask operations
UNUSED static char * format_input_text_64(const uint8_t *text) {
static char *buf = (char*)malloc(sizeof(simd8x64<uint8_t>) + 1);
for (size_t i=0; i<sizeof(simd8x64<uint8_t>); i++) {
buf[i] = int8_t(text[i]) < ' ' ? '_' : int8_t(text[i]);
}
buf[sizeof(simd8x64<uint8_t>)] = '\0';
return buf;
}
// Routines to print masks and text for debugging bitmask operations
UNUSED static char * format_input_text(const simd8x64<uint8_t> in) {
static char *buf = (char*)malloc(sizeof(simd8x64<uint8_t>) + 1);
in.store((uint8_t*)buf);
for (size_t i=0; i<sizeof(simd8x64<uint8_t>); i++) {
if (buf[i] < ' ') { buf[i] = '_'; }
}
buf[sizeof(simd8x64<uint8_t>)] = '\0';
return buf;
}
UNUSED static char * format_mask(uint64_t mask) {
static char *buf = (char*)malloc(64 + 1);
for (size_t i=0; i<64; i++) {
buf[i] = (mask & (size_t(1) << i)) ? 'X' : ' ';
}
buf[64] = '\0';
return buf;
}
template<size_t STEP_SIZE>
really_inline buf_block_reader<STEP_SIZE>::buf_block_reader(const uint8_t *_buf, size_t _len) : buf{_buf}, len{_len}, lenminusstep{len < STEP_SIZE ? 0 : len - STEP_SIZE}, idx{0} {}
@ -51,23 +83,3 @@ template<size_t STEP_SIZE>
really_inline void buf_block_reader<STEP_SIZE>::advance() {
idx += STEP_SIZE;
}
// Routines to print masks and text for debugging bitmask operations
UNUSED static char * format_input_text(const simd8x64<uint8_t> in) {
static char *buf = (char*)malloc(sizeof(simd8x64<uint8_t>) + 1);
in.store((uint8_t*)buf);
for (size_t i=0; i<sizeof(simd8x64<uint8_t>); i++) {
if (buf[i] < ' ') { buf[i] = '_'; }
}
buf[sizeof(simd8x64<uint8_t>)] = '\0';
return buf;
}
UNUSED static char * format_mask(uint64_t mask) {
static char *buf = (char*)malloc(64 + 1);
for (size_t i=0; i<64; i++) {
buf[i] = (mask & (size_t(1) << i)) ? 'X' : ' ';
}
buf[64] = '\0';
return buf;
}

View File

@ -60,19 +60,21 @@ public:
/**
* Find the important bits of JSON in a 128-byte chunk, and add them to structural_indexes.
*
* @param streaming Setting the streaming parameter to true allows the find_structural_bits to
* @param partial Setting the partial parameter to true allows the find_structural_bits to
* tolerate unclosed strings. The caller should still ensure that the input is valid UTF-8. If
* you are processing substrings, you may want to call on a function like trimmed_length_safe_utf8.
*/
template<size_t STEP_SIZE>
static error_code index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept;
static error_code index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool partial) noexcept;
private:
really_inline json_structural_indexer(uint32_t *structural_indexes);
template<size_t STEP_SIZE>
really_inline void step(const uint8_t *block, buf_block_reader<STEP_SIZE> &reader) noexcept;
really_inline void next(simd::simd8x64<uint8_t> in, json_block block, size_t idx);
really_inline error_code finish(dom_parser_implementation &parser, size_t idx, size_t len, bool streaming);
really_inline error_code finish(dom_parser_implementation &parser, size_t idx, size_t len, bool partial);
static really_inline uint32_t find_next_document_index(dom_parser_implementation &parser);
static really_inline size_t trim_partial_utf8(const uint8_t *buf, size_t len);
json_scanner scanner{};
utf8_checker checker{};
@ -101,8 +103,9 @@ really_inline json_structural_indexer::json_structural_indexer(uint32_t *structu
// workout.
//
template<size_t STEP_SIZE>
error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool streaming) noexcept {
error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_parser_implementation &parser, bool partial) noexcept {
if (unlikely(len > parser.capacity())) { return CAPACITY; }
if (partial) { len = trim_partial_utf8(buf, len); }
buf_block_reader<STEP_SIZE> reader(buf, len);
json_structural_indexer indexer(parser.structural_indexes.get());
@ -117,7 +120,7 @@ error_code json_structural_indexer::index(const uint8_t *buf, size_t len, dom_pa
if (unlikely(reader.get_remainder(block) == 0)) { return EMPTY; }
indexer.step<STEP_SIZE>(block, reader);
return indexer.finish(parser, reader.block_index(), len, streaming);
return indexer.finish(parser, reader.block_index(), len, partial);
}
template<>
@ -147,11 +150,11 @@ really_inline void json_structural_indexer::next(simd::simd8x64<uint8_t> in, jso
unescaped_chars_error |= block.non_quote_inside_string(unescaped);
}
really_inline error_code json_structural_indexer::finish(dom_parser_implementation &parser, size_t idx, size_t len, bool streaming) {
really_inline error_code json_structural_indexer::finish(dom_parser_implementation &parser, size_t idx, size_t len, bool partial) {
// Write out the final iteration's structurals
indexer.write(uint32_t(idx-64), prev_structurals);
error_code error = scanner.finish(streaming);
error_code error = scanner.finish(partial);
if (unlikely(error != SUCCESS)) { return error; }
if (unescaped_chars_error) {
@ -159,22 +162,13 @@ really_inline error_code json_structural_indexer::finish(dom_parser_implementati
}
parser.n_structural_indexes = uint32_t(indexer.tail - parser.structural_indexes.get());
/* a valid JSON file cannot have zero structural indexes - we should have
* found something */
// a valid JSON file cannot have zero structural indexes - we should have found something
if (unlikely(parser.n_structural_indexes == 0u)) {
return EMPTY;
}
if (unlikely(parser.structural_indexes[parser.n_structural_indexes - 1] > len)) {
return UNEXPECTED_ERROR;
}
if (len != parser.structural_indexes[parser.n_structural_indexes - 1]) {
/* the string might not be NULL terminated, but we add a virtual NULL
* ending character. */
parser.structural_indexes[parser.n_structural_indexes++] = uint32_t(len);
}
/* make it safe to dereference one beyond this array */
parser.structural_indexes[parser.n_structural_indexes] = uint32_t(len);
parser.structural_indexes[parser.n_structural_indexes + 1] = 0;
/***
* This is related to https://github.com/simdjson/simdjson/issues/906
* Basically, we want to make sure that if the parsing continues beyond the last (valid)
@ -189,7 +183,105 @@ really_inline error_code json_structural_indexer::finish(dom_parser_implementati
* starts with [, it should end with ]. If we enforce that rule, then we would get
* ][[ which is invalid.
**/
parser.structural_indexes[parser.n_structural_indexes] = uint32_t(len);
parser.structural_indexes[parser.n_structural_indexes + 1] = uint32_t(len);
parser.structural_indexes[parser.n_structural_indexes + 2] = 0;
if (partial) {
auto new_structural_indexes = find_next_document_index(parser);
if (new_structural_indexes == 0 && parser.n_structural_indexes > 0) {
return CAPACITY; // If the buffer is partial but the document is incomplete, it's too big to parse.
}
parser.n_structural_indexes = new_structural_indexes;
}
parser.next_structural_index = 0;
return checker.errors();
}
/**
* This algorithm is used to quickly identify the last structural position that
* makes up a complete document.
*
* It does this by going backwards and finding the last *document boundary* (a
* place where one value follows another without a comma between them). If the
* last document (the characters after the boundary) has an equal number of
* start and end brackets, it is considered complete.
*
* Simply put, we iterate over the structural characters, starting from
* the end. We consider that we found the end of a JSON document when the
* first element of the pair is NOT one of these characters: '{' '[' ';' ','
* and when the second element is NOT one of these characters: '}' '}' ';' ','.
*
* This simple comparison works most of the time, but it does not cover cases
* where the batch's structural indexes contain a perfect amount of documents.
* In such a case, we do not have access to the structural index which follows
* the last document, therefore, we do not have access to the second element in
* the pair, and means that we cannot identify the last document. To fix this
* issue, we keep a count of the open and closed curly/square braces we found
* while searching for the pair. When we find a pair AND the count of open and
* closed curly/square braces is the same, we know that we just passed a
* complete
* document, therefore the last json buffer location is the end of the batch
*/
really_inline uint32_t json_structural_indexer::find_next_document_index(dom_parser_implementation &parser) {
// TODO don't count separately, just figure out depth
auto arr_cnt = 0;
auto obj_cnt = 0;
for (auto i = parser.n_structural_indexes - 1; i > 0; i--) {
auto idxb = parser.structural_indexes[i];
switch (parser.buf[idxb]) {
case ':':
case ',':
continue;
case '}':
obj_cnt--;
continue;
case ']':
arr_cnt--;
continue;
case '{':
obj_cnt++;
break;
case '[':
arr_cnt++;
break;
}
auto idxa = parser.structural_indexes[i - 1];
switch (parser.buf[idxa]) {
case '{':
case '[':
case ':':
case ',':
continue;
}
// Last document is complete, so the next document will appear after!
if (!arr_cnt && !obj_cnt) {
return parser.n_structural_indexes;
}
// Last document is incomplete; mark the document at i + 1 as the next one
return i;
}
return 0;
}
// Skip the last character if it is partial
really_inline size_t json_structural_indexer::trim_partial_utf8(const uint8_t *buf, size_t len) {
if (unlikely(len < 3)) {
switch (len) {
case 2:
if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left
if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 2 bytes left
return len;
case 1:
if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left
return len;
case 0:
return len;
}
}
if (buf[len-1] >= 0b11000000) { return len-1; } // 2-, 3- and 4-byte characters with only 1 byte left
if (buf[len-2] >= 0b11100000) { return len-2; } // 3- and 4-byte characters with only 1 byte left
if (buf[len-3] >= 0b11110000) { return len-3; } // 4-byte characters with only 3 bytes left
return len;
}
} // namespace stage1

View File

@ -56,7 +56,7 @@ namespace logger {
}
printf("| %c ", printable_char(structurals.at_beginning() ? ' ' : structurals.current_char()));
printf("| %c ", printable_char(structurals.peek_char()));
printf("| %5zd ", structurals.next_structural);
printf("| %5u ", structurals.structural_indexes[structurals.next_structural]);
printf("| %-*s ", LOG_DETAIL_LEN, detail);
printf("| %*zu ", LOG_INDEX_LEN, structurals.idx);
printf("|\n");

View File

@ -1,12 +1,18 @@
namespace stage2 {
struct streaming_structural_parser: structural_parser {
really_inline streaming_structural_parser(dom_parser_implementation &_parser, uint32_t next_structural) : structural_parser(_parser, next_structural) {}
really_inline streaming_structural_parser(dom_parser_implementation &_parser) : structural_parser(_parser, _parser.next_structural_index) {}
// override to add streaming
WARN_UNUSED really_inline error_code start(ret_address_t finish_parser) {
// If there are no structurals left, return EMPTY
if (structurals.at_end(parser.n_structural_indexes)) {
return parser.error = EMPTY;
}
log_start();
init(); // sets is_valid to false
init();
// Capacity ain't no thang for streaming, so we don't check it.
// Advance to the first character as soon as possible
advance_char();
@ -24,6 +30,7 @@ struct streaming_structural_parser: structural_parser {
return parser.error = TAPE_ERROR;
}
end_document();
parser.next_structural_index = uint32_t(structurals.next_structural_index());
if (depth != 0) {
log_error("Unclosed objects or arrays!");
return parser.error = TAPE_ERROR;
@ -32,9 +39,7 @@ struct streaming_structural_parser: structural_parser {
log_error("IMPOSSIBLE: root scope tape index did not start at 0!");
return parser.error = TAPE_ERROR;
}
bool finished = structurals.at_end(parser.n_structural_indexes);
if (!finished) { log_value("(and has more)"); }
return finished ? SUCCESS : SUCCESS_AND_HAS_MORE;
return SUCCESS;
}
};
@ -44,12 +49,10 @@ struct streaming_structural_parser: structural_parser {
* The JSON is parsed to a tape, see the accompanying tape.md file
* for documentation.
***********/
WARN_UNUSED error_code dom_parser_implementation::stage2(const uint8_t *_buf, size_t _len, dom::document &_doc, size_t &next_json) noexcept {
this->buf = _buf;
this->len = _len;
WARN_UNUSED error_code dom_parser_implementation::stage2_next(dom::document &_doc) noexcept {
this->doc = &_doc;
static constexpr stage2::unified_machine_addresses addresses = INIT_ADDRESSES();
stage2::streaming_structural_parser parser(*this, uint32_t(next_json));
stage2::streaming_structural_parser parser(*this);
error_code result = parser.start(addresses.finish);
if (result) { return result; }
//
@ -158,7 +161,6 @@ array_continue:
}
finish:
next_json = parser.structurals.next_structural_index();
return parser.finish();
error:

View File

@ -52,10 +52,10 @@ public:
return result;
}
really_inline bool past_end(uint32_t n_structural_indexes) {
return next_structural+1 > n_structural_indexes;
return next_structural > n_structural_indexes;
}
really_inline bool at_end(uint32_t n_structural_indexes) {
return next_structural+1 == n_structural_indexes;
return next_structural == n_structural_indexes;
}
really_inline bool at_beginning() {
return next_structural == 0;

View File

@ -75,10 +75,7 @@ struct structural_parser {
uint8_t *current_string_buf_loc{};
uint32_t depth;
really_inline structural_parser(
dom_parser_implementation &_parser,
uint32_t next_structural = 0
) : structurals(_parser.buf, _parser.len, _parser.structural_indexes.get(), next_structural), parser{_parser}, depth{0} {}
really_inline structural_parser(dom_parser_implementation &_parser, uint32_t next_structural = 0) : structurals(_parser.buf, _parser.len, _parser.structural_indexes.get(), next_structural), parser{_parser}, depth{0} {}
WARN_UNUSED really_inline bool start_scope(ret_address_t continue_state) {
parser.containing_scope[depth].tape_index = parser.current_loc;
@ -333,7 +330,7 @@ struct structural_parser {
WARN_UNUSED really_inline error_code start(size_t len, ret_address_t finish_state) {
log_start();
init(); // sets is_valid to false
init();
if (len > parser.capacity()) {
return parser.error = CAPACITY;
}
@ -401,7 +398,7 @@ WARN_UNUSED error_code dom_parser_implementation::stage2(dom::document &_doc) no
FAIL_IF( parser.start_array(addresses.finish) );
// Make sure the outer array is closed before continuing; otherwise, there are ways we could get
// into memory corruption. See https://github.com/simdjson/simdjson/issues/906
if (buf[structural_indexes[n_structural_indexes - 2]] != ']') {
if (buf[structural_indexes[n_structural_indexes - 1]] != ']') {
goto error;
}
goto array_begin;

View File

@ -24,7 +24,7 @@ const char *TWITTER_JSON = SIMDJSON_BENCHMARK_DATA_DIR "twitter.json";
#define TEST_FAIL(MESSAGE) { cerr << "FAIL: " << (MESSAGE) << endl; return false; }
#define TEST_SUCCEED() { return true; }
namespace parser_load {
const char * NONEXISTENT_FILE = "this_file_does_not_exit.json";
const char * NONEXISTENT_FILE = "this_file_does_not_exist.json";
bool parser_load_capacity() {
TEST_START();
dom::parser parser(1); // 1 byte max capacity