Make threaded version work again

This commit is contained in:
John Keiser 2020-06-04 19:01:17 -07:00
parent d43a4e9df9
commit fe01da077e
3 changed files with 17 additions and 24 deletions

View File

@ -120,7 +120,7 @@ private:
inline size_t next_batch_start() const noexcept;
/** Pass the next batch through stage 1 with the given parser. */
inline void run_stage1(dom::parser &p, size_t batch_start) noexcept;
inline error_code run_stage1(dom::parser &p, size_t batch_start) noexcept;
dom::parser &parser;
const uint8_t *buf;
@ -131,20 +131,19 @@ private:
error_code error;
#ifdef SIMDJSON_THREADS_ENABLED
/**
* Start a thread to run stage 1 on the next batch.
*/
inline void load_from_stage1_thread() noexcept;
/** Start a thread to run stage 1 on the next batch. */
inline void start_stage1_thread() noexcept;
/**
* Wait for the stage 1 thread to finish and capture the results.
*/
/** Wait for the stage 1 thread to finish and capture the results. */
inline void finish_stage1_thread() noexcept;
/** The error returned from the stage 1 thread. */
error_code stage1_thread_error{UNINITIALIZED};
/** The thread used to run stage 1 against the next batch in the background. */
std::thread stage1_thread{};
/**
* The parser used to run stage 1 in the background. Will be swapped
* with the regular parser when finished.

View File

@ -72,13 +72,13 @@ inline void document_stream::start() noexcept {
// Always run the first stage 1 parse immediately
batch_start = 0;
run_stage1(parser, batch_start);
error = run_stage1(parser, batch_start);
if (error) { return; }
#ifdef SIMDJSON_THREADS_ENABLED
if (next_batch_start() < len) {
// Kick off the first thread if needed
error = thread_parser.ensure_capacity(batch_size);
error = stage1_thread_parser.ensure_capacity(batch_size);
if (error) { return; }
start_stage1_thread();
if (error) { return; }
@ -102,7 +102,7 @@ inline void document_stream::next() noexcept {
#ifdef SIMDJSON_THREADS_ENABLED
load_from_stage1_thread();
#else
run_stage1(parser, batch_start);
error = run_stage1(parser, batch_start);
#endif
if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
@ -115,13 +115,13 @@ inline size_t document_stream::next_batch_start() const noexcept {
return batch_start + parser.implementation->structural_indexes[parser.implementation->n_structural_indexes];
}
inline void document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept {
inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept {
// If this is the final batch, pass partial = false
size_t remaining = len - _batch_start;
if (remaining <= batch_size) {
error = p.implementation->stage1(&buf[_batch_start], remaining, false);
return p.implementation->stage1(&buf[_batch_start], remaining, false);
} else {
error = p.implementation->stage1(&buf[_batch_start], batch_size, true);
return p.implementation->stage1(&buf[_batch_start], batch_size, true);
}
}
@ -132,10 +132,9 @@ inline void document_stream::load_from_stage1_thread() noexcept {
// Swap to the parser that was loaded up in the thread. Make sure the parser has
// enough memory to swap to, as well.
error = parser.ensure_capacity(batch_size);
if (error) { return error; }
std::swap(parser, stage1_thread_parser);
if (stage1_thread_error) { return stage1_thread_error; }
error = stage1_thread_error;
if (error) { return; }
// If there's anything left, start the stage 1 thread!
if (next_batch_start() < len) {
@ -149,11 +148,9 @@ inline void document_stream::start_stage1_thread() noexcept {
// there is only one thread that may write to this value
// TODO this is NOT exception-safe.
this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
stage1_thread = std::thread([this] {
this->stage1_thread_error = this->thread_parser.ensure_capacity(this->batch_size);
if (!this->stage1_thread_error) {
this->stage1_thread_error = run_stage1(this->stage1_thread_parser, this->next_batch_start());
}
size_t _next_batch_start = this->next_batch_start();
stage1_thread = std::thread([this, _next_batch_start] {
this->stage1_thread_error = run_stage1(this->stage1_thread_parser, _next_batch_start);
});
}

View File

@ -561,9 +561,6 @@ namespace parse_api_tests {
memcpy(&empty_batches_ndjson[BATCH_SIZE*3+2], "1", 1);
memcpy(&empty_batches_ndjson[BATCH_SIZE*10+4], "2", 1);
memcpy(&empty_batches_ndjson[BATCH_SIZE*11+6], "3", 1);
for (int i=0; i<16; i++) {
printf("| %.*s |", BATCH_SIZE, &empty_batches_ndjson[BATCH_SIZE*i]);
}
for (auto [doc, error] : parser.parse_many(empty_batches_ndjson, BATCH_SIZE*16)) {
if (error) { cerr << "Error in parse_many: " << error << endl; return false; }
count++;