Move JsonStream inline implementation to inline .h
This commit is contained in:
parent
5525c6f729
commit
a55f41a24a
2
Makefile
2
Makefile
|
@ -65,7 +65,7 @@ SRCHEADERS_WESTMERE=src/westmere/bitmanipulation.h src/westmere/bitmask.h src/we
|
|||
SRCHEADERS_SRC=src/isadetection.h src/jsoncharutils.h src/simdprune_tables.h src/error.cpp src/jsonioutil.cpp src/implementation.cpp src/stage1_find_marks.cpp src/stage2_build_tape.cpp src/inline/document_parser_callbacks.h
|
||||
SRCHEADERS=$(SRCHEADERS_SRC) $(SRCHEADERS_GENERIC) $(SRCHEADERS_ARM64) $(SRCHEADERS_HASWELL) $(SRCHEADERS_WESTMERE)
|
||||
|
||||
INCLUDEHEADERS=include/simdjson.h include/simdjson/common_defs.h include/simdjson/internal/jsonformatutils.h include/simdjson/jsonioutil.h include/simdjson/jsonminifier.h include/simdjson/jsonparser.h include/simdjson/padded_string.h include/simdjson/document.h include/simdjson/inline/document.h include/simdjson/document_iterator.h include/simdjson/inline/document_iterator.h include/simdjson/implementation.h include/simdjson/parsedjson.h include/simdjson/jsonstream.h include/simdjson/portability.h include/simdjson/error.h include/simdjson/simdjson.h include/simdjson/simdjson_version.h
|
||||
INCLUDEHEADERS=include/simdjson.h include/simdjson/common_defs.h include/simdjson/internal/jsonformatutils.h include/simdjson/jsonioutil.h include/simdjson/jsonminifier.h include/simdjson/jsonparser.h include/simdjson/padded_string.h include/simdjson/document.h include/simdjson/inline/document.h include/simdjson/document_iterator.h include/simdjson/inline/document_iterator.h include/simdjson/implementation.h include/simdjson/parsedjson.h include/simdjson/jsonstream.h include/simdjson/inline/jsonstream.h include/simdjson/portability.h include/simdjson/error.h include/simdjson/simdjson.h include/simdjson/simdjson_version.h
|
||||
|
||||
ifeq ($(SIMDJSON_TEST_AMALGAMATED_HEADERS),1)
|
||||
HEADERS=singleheader/simdjson.h
|
||||
|
|
|
@ -1,20 +1,23 @@
|
|||
set(SIMDJSON_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include)
|
||||
set(SIMDJSON_INCLUDE
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/compiler_check.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/common_defs.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/document.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/compiler_check.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/document_iterator.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/document.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/error.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/implementation.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/inline/document.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/inline/document_iterator.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/inline/jsonstream.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/internal/jsonformatutils.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/jsonioutil.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/jsonminifier.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/jsonparser.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/jsonstream.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/padded_string.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/parsedjson.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/portability.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/simdjson.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/simdjson_version.h
|
||||
${SIMDJSON_INCLUDE_DIR}/simdjson/simdjson.h
|
||||
)
|
|
@ -21,5 +21,6 @@
|
|||
// Inline functions
|
||||
#include "simdjson/inline/document.h"
|
||||
#include "simdjson/inline/document_iterator.h"
|
||||
#include "simdjson/inline/jsonstream.h"
|
||||
|
||||
#endif // SIMDJSON_H
|
||||
|
|
|
@ -0,0 +1,258 @@
|
|||
#ifndef SIMDJSON_INLINE_JSONSTREAM_H
|
||||
#define SIMDJSON_INLINE_JSONSTREAM_H
|
||||
|
||||
#include "simdjson/jsonstream.h"
|
||||
#include <algorithm>
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
#include <thread>
|
||||
|
||||
namespace simdjson::internal {
|
||||
|
||||
/* This algorithm is used to quickly identify the buffer position of
|
||||
* the last JSON document inside the current batch.
|
||||
*
|
||||
* It does its work by finding the last pair of structural characters
|
||||
* that represent the end followed by the start of a document.
|
||||
*
|
||||
* Simply put, we iterate over the structural characters, starting from
|
||||
* the end. We consider that we found the end of a JSON document when the
|
||||
* first element of the pair is NOT one of these characters: '{' '[' ';' ','
|
||||
* and when the second element is NOT one of these characters: '}' '}' ';' ','.
|
||||
*
|
||||
* This simple comparison works most of the time, but it does not cover cases
|
||||
* where the batch's structural indexes contain a perfect amount of documents.
|
||||
* In such a case, we do not have access to the structural index which follows
|
||||
* the last document, therefore, we do not have access to the second element in
|
||||
* the pair, and means that we cannot identify the last document. To fix this
|
||||
* issue, we keep a count of the open and closed curly/square braces we found
|
||||
* while searching for the pair. When we find a pair AND the count of open and
|
||||
* closed curly/square braces is the same, we know that we just passed a
|
||||
* complete
|
||||
* document, therefore the last json buffer location is the end of the batch
|
||||
* */
|
||||
inline size_t find_last_json_buf_idx(const uint8_t *buf, size_t size, const document::parser &parser) {
|
||||
// this function can be generally useful
|
||||
if (parser.n_structural_indexes == 0)
|
||||
return 0;
|
||||
auto last_i = parser.n_structural_indexes - 1;
|
||||
if (parser.structural_indexes[last_i] == size) {
|
||||
if (last_i == 0)
|
||||
return 0;
|
||||
last_i = parser.n_structural_indexes - 2;
|
||||
}
|
||||
auto arr_cnt = 0;
|
||||
auto obj_cnt = 0;
|
||||
for (auto i = last_i; i > 0; i--) {
|
||||
auto idxb = parser.structural_indexes[i];
|
||||
switch (buf[idxb]) {
|
||||
case ':':
|
||||
case ',':
|
||||
continue;
|
||||
case '}':
|
||||
obj_cnt--;
|
||||
continue;
|
||||
case ']':
|
||||
arr_cnt--;
|
||||
continue;
|
||||
case '{':
|
||||
obj_cnt++;
|
||||
break;
|
||||
case '[':
|
||||
arr_cnt++;
|
||||
break;
|
||||
}
|
||||
auto idxa = parser.structural_indexes[i - 1];
|
||||
switch (buf[idxa]) {
|
||||
case '{':
|
||||
case '[':
|
||||
case ':':
|
||||
case ',':
|
||||
continue;
|
||||
}
|
||||
if (!arr_cnt && !obj_cnt) {
|
||||
return last_i + 1;
|
||||
}
|
||||
return i;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// returns true if the provided byte value is an ASCII character
|
||||
static inline bool is_ascii(char c) {
|
||||
return ((unsigned char)c) <= 127;
|
||||
}
|
||||
|
||||
// if the string ends with UTF-8 values, backtrack
|
||||
// up to the first ASCII character. May return 0.
|
||||
static inline size_t trimmed_length_safe_utf8(const char * c, size_t len) {
|
||||
while ((len > 0) and (not is_ascii(c[len - 1]))) {
|
||||
len--;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
} // namespace simdjson::internal
|
||||
|
||||
namespace simdjson {
|
||||
|
||||
template <class string_container>
|
||||
JsonStream<string_container>::JsonStream(const string_container &s,
|
||||
size_t batchSize)
|
||||
: str(s), _batch_size(batchSize) {
|
||||
}
|
||||
|
||||
template <class string_container> JsonStream<string_container>::~JsonStream() {
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
if (stage_1_thread.joinable()) {
|
||||
stage_1_thread.join();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
|
||||
// threaded version of json_parse
|
||||
// todo: simplify this code further
|
||||
template <class string_container>
|
||||
int JsonStream<string_container>::json_parse(document::parser &parser) {
|
||||
if (unlikely(parser.capacity() == 0)) {
|
||||
const bool allocok = parser.allocate_capacity(_batch_size);
|
||||
if (!allocok) {
|
||||
return parser.error = simdjson::MEMALLOC;
|
||||
}
|
||||
} else if (unlikely(parser.capacity() < _batch_size)) {
|
||||
return parser.error = simdjson::CAPACITY;
|
||||
}
|
||||
if (unlikely(parser_thread.capacity() < _batch_size)) {
|
||||
const bool allocok_thread = parser_thread.allocate_capacity(_batch_size);
|
||||
if (!allocok_thread) {
|
||||
return parser.error = simdjson::MEMALLOC;
|
||||
}
|
||||
}
|
||||
if (unlikely(load_next_batch)) {
|
||||
// First time loading
|
||||
if (!stage_1_thread.joinable()) {
|
||||
_batch_size = (std::min)(_batch_size, remaining());
|
||||
_batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size);
|
||||
if (_batch_size == 0) {
|
||||
return parser.error = simdjson::UTF8_ERROR;
|
||||
}
|
||||
auto stage1_is_ok = error_code(simdjson::active_implementation->stage1(buf(), _batch_size, parser, true));
|
||||
if (stage1_is_ok != simdjson::SUCCESS) {
|
||||
return parser.error = stage1_is_ok;
|
||||
}
|
||||
size_t last_index = internal::find_last_json_buf_idx(buf(), _batch_size, parser);
|
||||
if (last_index == 0) {
|
||||
if (parser.n_structural_indexes == 0) {
|
||||
return parser.error = simdjson::EMPTY;
|
||||
}
|
||||
} else {
|
||||
parser.n_structural_indexes = last_index + 1;
|
||||
}
|
||||
}
|
||||
// the second thread is running or done.
|
||||
else {
|
||||
stage_1_thread.join();
|
||||
if (stage1_is_ok_thread != simdjson::SUCCESS) {
|
||||
return parser.error = stage1_is_ok_thread;
|
||||
}
|
||||
std::swap(parser.structural_indexes, parser_thread.structural_indexes);
|
||||
parser.n_structural_indexes = parser_thread.n_structural_indexes;
|
||||
advance(last_json_buffer_loc);
|
||||
n_bytes_parsed += last_json_buffer_loc;
|
||||
}
|
||||
// let us decide whether we will start a new thread
|
||||
if (remaining() - _batch_size > 0) {
|
||||
last_json_buffer_loc =
|
||||
parser.structural_indexes[internal::find_last_json_buf_idx(buf(), _batch_size, parser)];
|
||||
_batch_size = (std::min)(_batch_size, remaining() - last_json_buffer_loc);
|
||||
if (_batch_size > 0) {
|
||||
_batch_size = internal::trimmed_length_safe_utf8(
|
||||
(const char *)(buf() + last_json_buffer_loc), _batch_size);
|
||||
if (_batch_size == 0) {
|
||||
return parser.error = simdjson::UTF8_ERROR;
|
||||
}
|
||||
// let us capture read-only variables
|
||||
const uint8_t *const b = buf() + last_json_buffer_loc;
|
||||
const size_t bs = _batch_size;
|
||||
// we call the thread on a lambda that will update
|
||||
// this->stage1_is_ok_thread
|
||||
// there is only one thread that may write to this value
|
||||
stage_1_thread = std::thread([this, b, bs] {
|
||||
this->stage1_is_ok_thread = error_code(simdjson::active_implementation->stage1(b, bs, this->parser_thread, true));
|
||||
});
|
||||
}
|
||||
}
|
||||
next_json = 0;
|
||||
load_next_batch = false;
|
||||
} // load_next_batch
|
||||
int res = simdjson::active_implementation->stage2(buf(), remaining(), parser, next_json);
|
||||
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
|
||||
n_parsed_docs++;
|
||||
current_buffer_loc = parser.structural_indexes[next_json];
|
||||
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
|
||||
} else if (res == simdjson::SUCCESS) {
|
||||
n_parsed_docs++;
|
||||
if (remaining() > _batch_size) {
|
||||
current_buffer_loc = parser.structural_indexes[next_json - 1];
|
||||
load_next_batch = true;
|
||||
res = simdjson::SUCCESS_AND_HAS_MORE;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
#else // SIMDJSON_THREADS_ENABLED
|
||||
|
||||
// single-threaded version of json_parse
|
||||
template <class string_container>
|
||||
int JsonStream<string_container>::json_parse(document::parser &parser) {
|
||||
if (unlikely(parser.capacity() == 0)) {
|
||||
const bool allocok = parser.allocate_capacity(_batch_size);
|
||||
if (!allocok) {
|
||||
return parser.on_error(MEMALLOC);
|
||||
}
|
||||
} else if (unlikely(parser.capacity() < _batch_size)) {
|
||||
return parser.on_error(CAPACITY);
|
||||
}
|
||||
if (unlikely(load_next_batch)) {
|
||||
advance(current_buffer_loc);
|
||||
n_bytes_parsed += current_buffer_loc;
|
||||
_batch_size = (std::min)(_batch_size, remaining());
|
||||
_batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size);
|
||||
auto stage1_is_ok = (error_code)simdjson::active_implementation->stage1(buf(), _batch_size, parser, true);
|
||||
if (stage1_is_ok != simdjson::SUCCESS) {
|
||||
return parser.on_error(stage1_is_ok);
|
||||
}
|
||||
size_t last_index = internal::find_last_json_buf_idx(buf(), _batch_size, parser);
|
||||
if (last_index == 0) {
|
||||
if (parser.n_structural_indexes == 0) {
|
||||
return parser.on_error(EMPTY);
|
||||
}
|
||||
} else {
|
||||
parser.n_structural_indexes = last_index + 1;
|
||||
}
|
||||
load_next_batch = false;
|
||||
} // load_next_batch
|
||||
int res = simdjson::active_implementation->stage2(buf(), remaining(), parser, next_json);
|
||||
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
|
||||
n_parsed_docs++;
|
||||
current_buffer_loc = parser.structural_indexes[next_json];
|
||||
} else if (res == simdjson::SUCCESS) {
|
||||
n_parsed_docs++;
|
||||
if (remaining() > _batch_size) {
|
||||
current_buffer_loc = parser.structural_indexes[next_json - 1];
|
||||
next_json = 1;
|
||||
load_next_batch = true;
|
||||
res = simdjson::SUCCESS_AND_HAS_MORE;
|
||||
}
|
||||
} else {
|
||||
printf("E\n");
|
||||
}
|
||||
return res;
|
||||
}
|
||||
#endif // SIMDJSON_THREADS_ENABLED
|
||||
|
||||
} // end of namespace simdjson
|
||||
#endif // SIMDJSON_INLINE_JSONSTREAM_H
|
|
@ -1,15 +1,12 @@
|
|||
#ifndef SIMDJSON_JSONSTREAM_H
|
||||
#define SIMDJSON_JSONSTREAM_H
|
||||
|
||||
#include <algorithm>
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
#include <thread>
|
||||
#include "simdjson/padded_string.h"
|
||||
#include "simdjson/simdjson.h"
|
||||
|
||||
|
||||
namespace simdjson {
|
||||
|
||||
/*************************************************************************************
|
||||
* The main motivation for this piece of software is to achieve maximum speed
|
||||
*and offer
|
||||
|
@ -148,249 +145,5 @@ private:
|
|||
#endif
|
||||
}; // end of class JsonStream
|
||||
|
||||
/* This algorithm is used to quickly identify the buffer position of
|
||||
* the last JSON document inside the current batch.
|
||||
*
|
||||
* It does its work by finding the last pair of structural characters
|
||||
* that represent the end followed by the start of a document.
|
||||
*
|
||||
* Simply put, we iterate over the structural characters, starting from
|
||||
* the end. We consider that we found the end of a JSON document when the
|
||||
* first element of the pair is NOT one of these characters: '{' '[' ';' ','
|
||||
* and when the second element is NOT one of these characters: '}' '}' ';' ','.
|
||||
*
|
||||
* This simple comparison works most of the time, but it does not cover cases
|
||||
* where the batch's structural indexes contain a perfect amount of documents.
|
||||
* In such a case, we do not have access to the structural index which follows
|
||||
* the last document, therefore, we do not have access to the second element in
|
||||
* the pair, and means that we cannot identify the last document. To fix this
|
||||
* issue, we keep a count of the open and closed curly/square braces we found
|
||||
* while searching for the pair. When we find a pair AND the count of open and
|
||||
* closed curly/square braces is the same, we know that we just passed a
|
||||
* complete
|
||||
* document, therefore the last json buffer location is the end of the batch
|
||||
* */
|
||||
inline size_t find_last_json_buf_idx(const uint8_t *buf, size_t size,
|
||||
const document::parser &parser) {
|
||||
// this function can be generally useful
|
||||
if (parser.n_structural_indexes == 0)
|
||||
return 0;
|
||||
auto last_i = parser.n_structural_indexes - 1;
|
||||
if (parser.structural_indexes[last_i] == size) {
|
||||
if (last_i == 0)
|
||||
return 0;
|
||||
last_i = parser.n_structural_indexes - 2;
|
||||
}
|
||||
auto arr_cnt = 0;
|
||||
auto obj_cnt = 0;
|
||||
for (auto i = last_i; i > 0; i--) {
|
||||
auto idxb = parser.structural_indexes[i];
|
||||
switch (buf[idxb]) {
|
||||
case ':':
|
||||
case ',':
|
||||
continue;
|
||||
case '}':
|
||||
obj_cnt--;
|
||||
continue;
|
||||
case ']':
|
||||
arr_cnt--;
|
||||
continue;
|
||||
case '{':
|
||||
obj_cnt++;
|
||||
break;
|
||||
case '[':
|
||||
arr_cnt++;
|
||||
break;
|
||||
}
|
||||
auto idxa = parser.structural_indexes[i - 1];
|
||||
switch (buf[idxa]) {
|
||||
case '{':
|
||||
case '[':
|
||||
case ':':
|
||||
case ',':
|
||||
continue;
|
||||
}
|
||||
if (!arr_cnt && !obj_cnt) {
|
||||
return last_i + 1;
|
||||
}
|
||||
return i;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <class string_container>
|
||||
JsonStream<string_container>::JsonStream(const string_container &s,
|
||||
size_t batchSize)
|
||||
: str(s), _batch_size(batchSize) {
|
||||
}
|
||||
|
||||
template <class string_container> JsonStream<string_container>::~JsonStream() {
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
if (stage_1_thread.joinable()) {
|
||||
stage_1_thread.join();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
namespace internal {
|
||||
// returns true if the provided byte value is an ASCII character
|
||||
static inline bool is_ascii(char c) {
|
||||
return ((unsigned char)c) <= 127;
|
||||
}
|
||||
|
||||
// if the string ends with UTF-8 values, backtrack
|
||||
// up to the first ASCII character. May return 0.
|
||||
static inline size_t trimmed_length_safe_utf8(const char * c, size_t len) {
|
||||
while ((len > 0) and (not is_ascii(c[len - 1]))) {
|
||||
len--;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
|
||||
// threaded version of json_parse
|
||||
// todo: simplify this code further
|
||||
template <class string_container>
|
||||
int JsonStream<string_container>::json_parse(document::parser &parser) {
|
||||
if (unlikely(parser.capacity() == 0)) {
|
||||
const bool allocok = parser.allocate_capacity(_batch_size);
|
||||
if (!allocok) {
|
||||
return parser.error = simdjson::MEMALLOC;
|
||||
}
|
||||
} else if (unlikely(parser.capacity() < _batch_size)) {
|
||||
return parser.error = simdjson::CAPACITY;
|
||||
}
|
||||
if (unlikely(parser_thread.capacity() < _batch_size)) {
|
||||
const bool allocok_thread = parser_thread.allocate_capacity(_batch_size);
|
||||
if (!allocok_thread) {
|
||||
return parser.error = simdjson::MEMALLOC;
|
||||
}
|
||||
}
|
||||
if (unlikely(load_next_batch)) {
|
||||
// First time loading
|
||||
if (!stage_1_thread.joinable()) {
|
||||
_batch_size = (std::min)(_batch_size, remaining());
|
||||
_batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size);
|
||||
if (_batch_size == 0) {
|
||||
return parser.error = simdjson::UTF8_ERROR;
|
||||
}
|
||||
auto stage1_is_ok = error_code(simdjson::active_implementation->stage1(buf(), _batch_size, parser, true));
|
||||
if (stage1_is_ok != simdjson::SUCCESS) {
|
||||
return parser.error = stage1_is_ok;
|
||||
}
|
||||
size_t last_index = find_last_json_buf_idx(buf(), _batch_size, parser);
|
||||
if (last_index == 0) {
|
||||
if (parser.n_structural_indexes == 0) {
|
||||
return parser.error = simdjson::EMPTY;
|
||||
}
|
||||
} else {
|
||||
parser.n_structural_indexes = last_index + 1;
|
||||
}
|
||||
}
|
||||
// the second thread is running or done.
|
||||
else {
|
||||
stage_1_thread.join();
|
||||
if (stage1_is_ok_thread != simdjson::SUCCESS) {
|
||||
return parser.error = stage1_is_ok_thread;
|
||||
}
|
||||
std::swap(parser.structural_indexes, parser_thread.structural_indexes);
|
||||
parser.n_structural_indexes = parser_thread.n_structural_indexes;
|
||||
advance(last_json_buffer_loc);
|
||||
n_bytes_parsed += last_json_buffer_loc;
|
||||
}
|
||||
// let us decide whether we will start a new thread
|
||||
if (remaining() - _batch_size > 0) {
|
||||
last_json_buffer_loc =
|
||||
parser.structural_indexes[find_last_json_buf_idx(buf(), _batch_size, parser)];
|
||||
_batch_size = (std::min)(_batch_size, remaining() - last_json_buffer_loc);
|
||||
if (_batch_size > 0) {
|
||||
_batch_size = internal::trimmed_length_safe_utf8(
|
||||
(const char *)(buf() + last_json_buffer_loc), _batch_size);
|
||||
if (_batch_size == 0) {
|
||||
return parser.error = simdjson::UTF8_ERROR;
|
||||
}
|
||||
// let us capture read-only variables
|
||||
const uint8_t *const b = buf() + last_json_buffer_loc;
|
||||
const size_t bs = _batch_size;
|
||||
// we call the thread on a lambda that will update
|
||||
// this->stage1_is_ok_thread
|
||||
// there is only one thread that may write to this value
|
||||
stage_1_thread = std::thread([this, b, bs] {
|
||||
this->stage1_is_ok_thread = error_code(simdjson::active_implementation->stage1(b, bs, this->parser_thread, true));
|
||||
});
|
||||
}
|
||||
}
|
||||
next_json = 0;
|
||||
load_next_batch = false;
|
||||
} // load_next_batch
|
||||
int res = simdjson::active_implementation->stage2(buf(), remaining(), parser, next_json);
|
||||
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
|
||||
n_parsed_docs++;
|
||||
current_buffer_loc = parser.structural_indexes[next_json];
|
||||
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
|
||||
} else if (res == simdjson::SUCCESS) {
|
||||
n_parsed_docs++;
|
||||
if (remaining() > _batch_size) {
|
||||
current_buffer_loc = parser.structural_indexes[next_json - 1];
|
||||
load_next_batch = true;
|
||||
res = simdjson::SUCCESS_AND_HAS_MORE;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
#else // SIMDJSON_THREADS_ENABLED
|
||||
|
||||
// single-threaded version of json_parse
|
||||
template <class string_container>
|
||||
int JsonStream<string_container>::json_parse(document::parser &parser) {
|
||||
if (unlikely(parser.capacity() == 0)) {
|
||||
const bool allocok = parser.allocate_capacity(_batch_size);
|
||||
if (!allocok) {
|
||||
return parser.on_error(MEMALLOC);
|
||||
}
|
||||
} else if (unlikely(parser.capacity() < _batch_size)) {
|
||||
return parser.on_error(CAPACITY);
|
||||
}
|
||||
if (unlikely(load_next_batch)) {
|
||||
advance(current_buffer_loc);
|
||||
n_bytes_parsed += current_buffer_loc;
|
||||
_batch_size = (std::min)(_batch_size, remaining());
|
||||
_batch_size = internal::trimmed_length_safe_utf8((const char *)buf(), _batch_size);
|
||||
auto stage1_is_ok = (error_code)simdjson::active_implementation->stage1(buf(), _batch_size, parser, true);
|
||||
if (stage1_is_ok != simdjson::SUCCESS) {
|
||||
return parser.on_error(stage1_is_ok);
|
||||
}
|
||||
size_t last_index = find_last_json_buf_idx(buf(), _batch_size, parser);
|
||||
if (last_index == 0) {
|
||||
if (parser.n_structural_indexes == 0) {
|
||||
return parser.on_error(EMPTY);
|
||||
}
|
||||
} else {
|
||||
parser.n_structural_indexes = last_index + 1;
|
||||
}
|
||||
load_next_batch = false;
|
||||
} // load_next_batch
|
||||
int res = simdjson::active_implementation->stage2(buf(), remaining(), parser, next_json);
|
||||
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
|
||||
n_parsed_docs++;
|
||||
current_buffer_loc = parser.structural_indexes[next_json];
|
||||
} else if (res == simdjson::SUCCESS) {
|
||||
n_parsed_docs++;
|
||||
if (remaining() > _batch_size) {
|
||||
current_buffer_loc = parser.structural_indexes[next_json - 1];
|
||||
next_json = 1;
|
||||
load_next_batch = true;
|
||||
res = simdjson::SUCCESS_AND_HAS_MORE;
|
||||
}
|
||||
} else {
|
||||
printf("E\n");
|
||||
}
|
||||
return res;
|
||||
}
|
||||
#endif // SIMDJSON_THREADS_ENABLED
|
||||
|
||||
} // end of namespace simdjson
|
||||
#endif // SIMDJSON_JSONSTREAM_H
|
||||
|
|
Loading…
Reference in New Issue