diff --git a/benchmark/parse_stream.cpp b/benchmark/parse_stream.cpp old mode 100755 new mode 100644 index 5f769797..5735d417 --- a/benchmark/parse_stream.cpp +++ b/benchmark/parse_stream.cpp @@ -1,147 +1,181 @@ -#include #include #include -#include +#include #include +#include #include "simdjson.h" -#define NB_ITERATION 5 -#define MIN_BATCH_SIZE 200000 +#define NB_ITERATION 20 +#define MIN_BATCH_SIZE 10000 #define MAX_BATCH_SIZE 10000000 bool test_baseline = false; bool test_per_batch = true; -bool test_best_batch = true; +bool test_best_batch = false; -bool compare(std::pair i, std::pair j){ - return i.second > j.second; +bool compare(std::pair i, std::pair j) { + return i.second > j.second; } -int main (int argc, char *argv[]){ +int main(int argc, char *argv[]) { - if (argc <= 1) { - std::cerr << "Usage: " << argv[0] << " " << std::endl; - exit(1); - } - const char *filename = argv[1]; - auto [p, err] = simdjson::padded_string::load(filename); - if (err) { - std::cerr << "Could not load the file " << filename << std::endl; + if (argc <= 1) { + std::cerr << "Usage: " << argv[0] << " " << std::endl; + exit(1); + } + const char *filename = argv[1]; + auto[p, err] = simdjson::padded_string::load(filename); + if (err) { + std::cerr << "Could not load the file " << filename << std::endl; + return EXIT_FAILURE; + } + if (test_baseline) { + std::wclog << "Baseline: Getline + normal parse... " << std::endl; + std::cout << "Gigabytes/second\t" + << "Nb of documents parsed" << std::endl; + for (auto i = 0; i < 3; i++) { + // Actual test + simdjson::dom::parser parser; + simdjson::error_code alloc_error = parser.allocate(p.size()); + if (alloc_error) { + std::cerr << alloc_error << std::endl; return EXIT_FAILURE; + } + std::istringstream ss(std::string(p.data(), p.size())); + + auto start = std::chrono::steady_clock::now(); + int count = 0; + std::string line; + int parse_res = simdjson::SUCCESS; + while (getline(ss, line)) { + // TODO we're likely triggering simdjson's padding reallocation here. Is + // that intentional? + parser.parse(line); + count++; + } + + auto end = std::chrono::steady_clock::now(); + + std::chrono::duration secs = end - start; + double speedinGBs = static_cast(p.size()) / + (static_cast(secs.count()) * 1000000000.0); + std::cout << speedinGBs << "\t\t\t\t" << count << std::endl; + + if (parse_res != simdjson::SUCCESS) { + std::cerr << "Parsing failed" << std::endl; + exit(1); + } } - if (test_baseline) { - std::wclog << "Baseline: Getline + normal parse... " << std::endl; - std::cout << "Gigabytes/second\t" << "Nb of documents parsed" << std::endl; - for (auto i = 0; i < 3; i++) { - //Actual test - simdjson::dom::parser parser; - simdjson::error_code alloc_error = parser.allocate(p.size()); - if (alloc_error) { - std::cerr << alloc_error << std::endl; - return EXIT_FAILURE; - } - std::istringstream ss(std::string(p.data(), p.size())); + } - auto start = std::chrono::steady_clock::now(); - int count = 0; - std::string line; - int parse_res = simdjson::SUCCESS; - while (getline(ss, line)) { - // TODO we're likely triggering simdjson's padding reallocation here. Is that intentional? - parser.parse(line); - count++; - } + std::map batch_size_res; + if (test_per_batch) { + std::wclog << "parse_many: Speed per batch_size... from " << MIN_BATCH_SIZE + << " bytes to " << MAX_BATCH_SIZE << " bytes..." << std::endl; + std::cout << "Batch Size\t" + << "Gigabytes/second\t" + << "Nb of documents parsed" << std::endl; + for (size_t i = MIN_BATCH_SIZE; i <= MAX_BATCH_SIZE; + i += (MAX_BATCH_SIZE - MIN_BATCH_SIZE) / 100) { + batch_size_res.insert(std::pair(i, 0)); + int count; + for (size_t j = 0; j < 5; j++) { + // Actual test + simdjson::dom::parser parser; + simdjson::error_code error; - auto end = std::chrono::steady_clock::now(); - - std::chrono::duration secs = end - start; - double speedinGBs = static_cast(p.size()) / (static_cast(secs.count()) * 1000000000.0); - std::cout << speedinGBs << "\t\t\t\t" << count << std::endl; - - if (parse_res != simdjson::SUCCESS) { - std::cerr << "Parsing failed" << std::endl; - exit(1); - } + auto start = std::chrono::steady_clock::now(); + count = 0; + for (auto result : parser.parse_many(p, i)) { + error = result.error(); + if (error != simdjson::SUCCESS) { + std::wcerr << "Parsing failed with: " << error_message(error) << std::endl; + exit(1); + } + count++; } + auto end = std::chrono::steady_clock::now(); + + std::chrono::duration secs = end - start; + double speedinGBs = static_cast(p.size()) / + (static_cast(secs.count()) * 1000000000.0); + if (speedinGBs > batch_size_res.at(i)) + batch_size_res[i] = speedinGBs; + } + std::cout << i << "\t\t" << std::fixed << std::setprecision(3) + << batch_size_res.at(i) << "\t\t\t\t" << count << std::endl; + } + } + size_t optimal_batch_size{}; + double best_speed{}; + if (test_per_batch) { + std::pair best_results; + best_results = + (*min_element(batch_size_res.begin(), batch_size_res.end(), compare)); + optimal_batch_size = best_results.first; + best_speed = best_results.second; + } else { + optimal_batch_size = MIN_BATCH_SIZE; + } + std::wclog << "Seemingly optimal batch_size: " << optimal_batch_size << "..." + << std::endl; + std::wclog << "Best speed: " << best_speed << "..." << std::endl; + + if (test_best_batch) { + std::wclog << "Starting speed test... Best of " << NB_ITERATION + << " iterations..." << std::endl; + std::vector res; + for (int i = 0; i < NB_ITERATION; i++) { + + // Actual test + simdjson::dom::parser parser; + simdjson::error_code error; + + auto start = std::chrono::steady_clock::now(); + // This includes allocation of the parser + for (auto result : parser.parse_many(p, optimal_batch_size)) { + error = result.error(); + if (error != simdjson::SUCCESS) { + std::wcerr << "Parsing failed with: " << error_message(error) << std::endl; + exit(1); + } + } + auto end = std::chrono::steady_clock::now(); + + std::chrono::duration secs = end - start; + res.push_back(secs.count()); } - std::map batch_size_res; - if(test_per_batch) { - std::wclog << "parse_many: Speed per batch_size... from " << MIN_BATCH_SIZE - << " bytes to " << MAX_BATCH_SIZE << " bytes..." << std::endl; - std::cout << "Batch Size\t" << "Gigabytes/second\t" << "Nb of documents parsed" << std::endl; - for (size_t i = MIN_BATCH_SIZE; i <= MAX_BATCH_SIZE; i += (MAX_BATCH_SIZE - MIN_BATCH_SIZE) / 50) { - batch_size_res.insert(std::pair(i, 0)); - int count; - for (size_t j = 0; j < 5; j++) { - //Actual test - simdjson::dom::parser parser; - simdjson::error_code error; + double min_result = *min_element(res.begin(), res.end()); + double speedinGBs = + static_cast(p.size()) / (min_result * 1000000000.0); - auto start = std::chrono::steady_clock::now(); - count = 0; - for (auto result : parser.parse_many(p, 4000000)) { - error = result.error(); - count++; - } - auto end = std::chrono::steady_clock::now(); + std::cout << "Min: " << min_result << " bytes read: " << p.size() + << " Gigabytes/second: " << speedinGBs << std::endl; + } +#ifdef SIMDJSON_THREADS_ENABLED + // Multithreading probably does not help matters for small files (less than 10 + // MB). + if (p.size() < 10000000) { + std::cout << std::endl; - std::chrono::duration secs = end - start; - double speedinGBs = static_cast(p.size()) / (static_cast(secs.count()) * 1000000000.0); - if (speedinGBs > batch_size_res.at(i)) - batch_size_res[i] = speedinGBs; + std::cout << "Warning: your file is small and the performance results are " + "probably meaningless" + << std::endl; + std::cout << "as far as multithreaded performance goes." << std::endl; - if (error != simdjson::SUCCESS) { - std::wcerr << "Parsing failed with: " << error << std::endl; - exit(1); - } - } - std::cout << i << "\t\t" << std::fixed << std::setprecision(3) << batch_size_res.at(i) << "\t\t\t\t" << count << std::endl; + std::cout << std::endl; - } - } + std::cout + << "Try to concatenate the file with itself to generate a large one." + << std::endl; + std::cout << "In bash: " << std::endl; + std::cout << "for i in {1..1000}; do cat '" << filename + << "' >> bar.ndjson; done" << std::endl; + std::cout << argv[0] << " bar.ndjson" << std::endl; + } +#endif - if (test_best_batch) { - size_t optimal_batch_size; - if (test_per_batch) { - optimal_batch_size = (*min_element(batch_size_res.begin(), batch_size_res.end(), compare)).first; - } else { - optimal_batch_size = MIN_BATCH_SIZE; - } - std::wclog << "Starting speed test... Best of " << NB_ITERATION << " iterations..." << std::endl; - std::wclog << "Seemingly optimal batch_size: " << optimal_batch_size << "..." << std::endl; - std::vector res; - for (int i = 0; i < NB_ITERATION; i++) { - - // Actual test - simdjson::dom::parser parser; - simdjson::error_code error; - - auto start = std::chrono::steady_clock::now(); - // TODO this includes allocation of the parser; is that intentional? - for (auto result : parser.parse_many(p, 4000000)) { - error = result.error(); - } - auto end = std::chrono::steady_clock::now(); - - std::chrono::duration secs = end - start; - res.push_back(secs.count()); - - if (error != simdjson::SUCCESS) { - std::wcerr << "Parsing failed with: " << error << std::endl; - exit(1); - } - - } - - double min_result = *min_element(res.begin(), res.end()); - double speedinGBs = static_cast(p.size()) / (min_result * 1000000000.0); - - - std::cout << "Min: " << min_result << " bytes read: " << p.size() - << " Gigabytes/second: " << speedinGBs << std::endl; - } - - return 0; + return 0; } diff --git a/doc/basics.md b/doc/basics.md index 33bf377b..dcf72c78 100644 --- a/doc/basics.md +++ b/doc/basics.md @@ -452,7 +452,7 @@ The simdjson library also support multithreaded JSON streaming through a large f smaller JSON documents in either [ndjson](http://ndjson.org) or [JSON lines](http://jsonlines.org) format. If your JSON documents all contain arrays or objects, we even support direct file concatenation without whitespace. The concatenated file has no size restrictions (including larger -than 4GB), though each individual document must be less than 4GB. +than 4GB), though each individual document must be no larger than 4 GB. Here is a simple example, given "x.json" with this content: @@ -472,6 +472,8 @@ for (dom::element doc : parser.load_many(filename)) { In-memory ndjson strings can be parsed as well, with `parser.parse_many(string)`. +Both `load_many` and `parse_many` take an optional parameter `size_t batch_size` which defines the window processing size. It is set by default to a large value (`1000000` corresponding to 1 MB). None of your JSON documents should exceed this window size, or else you will get the error `simdjson::CAPACITY`. You cannot set this window size larger than 4 GB: you will get the error `simdjson::CAPACITY`. The smaller the window size is, the less memory the function will use. Setting the window size too small (e.g., less than 100 kB) may also impact performance negatively. Leaving it to 1 MB is expected to be a good choice, unless you have some larger documents. + See [parse_many.md](parse_many.md) for detailed information and design. Thread Safety diff --git a/include/simdjson/dom/document_stream.h b/include/simdjson/dom/document_stream.h index 1d7ba43a..fee73ac8 100644 --- a/include/simdjson/dom/document_stream.h +++ b/include/simdjson/dom/document_stream.h @@ -6,11 +6,63 @@ #include "simdjson/error.h" #ifdef SIMDJSON_THREADS_ENABLED #include +#include +#include #endif namespace simdjson { namespace dom { + +#ifdef SIMDJSON_THREADS_ENABLED +struct stage1_worker { + stage1_worker() noexcept = default; + stage1_worker(const stage1_worker&) = delete; + stage1_worker(stage1_worker&&) = delete; + stage1_worker operator=(const stage1_worker&) = delete; + ~stage1_worker(); + /** + * We only start the thread when it is needed, not at object construction, this may throw. + * You should only call this once. + **/ + void start_thread(); + /** + * Start a stage 1 job. You should first call 'run', then 'finish'. + * You must call start_thread once before. + */ + void run(document_stream * ds, dom::parser * stage1, size_t next_batch_start); + /** Wait for the run to finish (blocking). You should first call 'run', then 'finish'. **/ + void finish(); + +private: + + /** + * Normally, we would never stop the thread. But we do in the destructor. + * This function is only safe assuming that you are not waiting for results. You + * should have called run, then finish, and be done. + **/ + void stop_thread(); + + std::thread thread{}; + /** These three variables define the work done by the thread. **/ + dom::parser * stage1_thread_parser{}; + size_t _next_batch_start{}; + document_stream * owner{}; + /** + * We have two state variables. This could be streamlined to one variable in the future but + * we use two for clarity. + */ + bool has_work{false}; + bool can_work{true}; + + /** + * We lock using a mutex. + */ + std::mutex locking_mutex{}; + std::condition_variable cond_var{}; +}; +#endif + /** * A forward-only stream of documents. * @@ -142,8 +194,8 @@ private: /** The error returned from the stage 1 thread. */ error_code stage1_thread_error{UNINITIALIZED}; /** The thread used to run stage 1 against the next batch in the background. */ - std::thread stage1_thread{}; - + friend struct stage1_worker; + std::unique_ptr worker{new(std::nothrow) stage1_worker()}; /** * The parser used to run stage 1 in the background. Will be swapped * with the regular parser when finished. diff --git a/include/simdjson/dom/parser.h b/include/simdjson/dom/parser.h index f3030904..49ca234d 100644 --- a/include/simdjson/dom/parser.h +++ b/include/simdjson/dom/parser.h @@ -159,6 +159,10 @@ public: * documents that consist of an object or array may omit the whitespace between them, concatenating * with no separator. documents that consist of a single primitive (i.e. documents that are not * arrays or objects) MUST be separated with whitespace. + * + * The documents must not exceed batch_size bytes (by default 1MB) or they will fail to parse. + * Setting batch_size to excessively large or excesively small values may impact negatively the + * performance. * * ### Error Handling * @@ -216,6 +220,10 @@ public: * documents that consist of an object or array may omit the whitespace between them, concatenating * with no separator. documents that consist of a single primitive (i.e. documents that are not * arrays or objects) MUST be separated with whitespace. + * + * The documents must not exceed batch_size bytes (by default 1MB) or they will fail to parse. + * Setting batch_size to excessively large or excesively small values may impact negatively the + * performance. * * ### Error Handling * diff --git a/include/simdjson/inline/document_stream.h b/include/simdjson/inline/document_stream.h index 7abf6145..04b2230c 100644 --- a/include/simdjson/inline/document_stream.h +++ b/include/simdjson/inline/document_stream.h @@ -5,10 +5,65 @@ #include #include #include - namespace simdjson { namespace dom { +#ifdef SIMDJSON_THREADS_ENABLED +inline void stage1_worker::finish() { + std::unique_lock lock(locking_mutex); + cond_var.wait(lock, [this]{return has_work == false;}); +} + +inline stage1_worker::~stage1_worker() { + stop_thread(); +} + +inline void stage1_worker::start_thread() { + std::unique_lock lock(locking_mutex); + if(thread.joinable()) { + return; // This should never happen but we never want to create more than one thread. + } + thread = std::thread([this]{ + while(can_work) { + std::unique_lock thread_lock(locking_mutex); + cond_var.wait(thread_lock, [this]{return has_work || !can_work;}); + if(!can_work) { + break; + } + this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser, + this->_next_batch_start); + this->has_work = false; + thread_lock.unlock(); + cond_var.notify_one(); // will notify "finish" + } + } + ); +} + + +inline void stage1_worker::stop_thread() { + std::unique_lock lock(locking_mutex); + // We have to make sure that all locks can be released. + can_work = false; + has_work = false; + lock.unlock(); + cond_var.notify_all(); + if(thread.joinable()) { + thread.join(); + } +} + +inline void stage1_worker::run(document_stream * ds, dom::parser * stage1, size_t next_batch_start) { + std::unique_lock lock(locking_mutex); + owner = ds; + _next_batch_start = next_batch_start; + stage1_thread_parser = stage1; + has_work = true; + lock.unlock(); + cond_var.notify_one();// will notify the thread lock +} +#endif + really_inline document_stream::document_stream( dom::parser &_parser, const uint8_t *_buf, @@ -22,15 +77,14 @@ really_inline document_stream::document_stream( batch_size{_batch_size}, error{_error} { +#ifdef SIMDJSON_THREADS_ENABLED + if(worker.get() == nullptr) { + error = MEMALLOC; + } +#endif } inline document_stream::~document_stream() noexcept { -#ifdef SIMDJSON_THREADS_ENABLED - // TODO kill the thread, why should people have to wait for a non-side-effecting operation to complete - if (stage1_thread.joinable()) { - stage1_thread.join(); - } -#endif } really_inline document_stream::iterator document_stream::begin() noexcept { @@ -80,6 +134,7 @@ inline void document_stream::start() noexcept { // Kick off the first thread if needed error = stage1_thread_parser.ensure_capacity(batch_size); if (error) { return; } + worker->start_thread(); start_stage1_thread(); if (error) { return; } } @@ -93,7 +148,6 @@ inline void document_stream::next() noexcept { // Load the next document from the batch error = parser.implementation->stage2_next(parser.doc); - // If that was the last document in the batch, load another batch (if available) while (error == EMPTY) { batch_start = next_batch_start(); @@ -105,7 +159,6 @@ inline void document_stream::next() noexcept { error = run_stage1(parser, batch_start); #endif if (error) { continue; } // If the error was EMPTY, we may want to load another batch. - // Run stage 2 on the first document in the batch error = parser.implementation->stage2_next(parser.doc); } @@ -128,8 +181,7 @@ inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_star #ifdef SIMDJSON_THREADS_ENABLED inline void document_stream::load_from_stage1_thread() noexcept { - stage1_thread.join(); - + worker->finish(); // Swap to the parser that was loaded up in the thread. Make sure the parser has // enough memory to swap to, as well. std::swap(parser, stage1_thread_parser); @@ -149,9 +201,8 @@ inline void document_stream::start_stage1_thread() noexcept { // TODO this is NOT exception-safe. this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error size_t _next_batch_start = this->next_batch_start(); - stage1_thread = std::thread([this, _next_batch_start] { - this->stage1_thread_error = run_stage1(this->stage1_thread_parser, _next_batch_start); - }); + + worker->run(this, & this->stage1_thread_parser, _next_batch_start); } #endif // SIMDJSON_THREADS_ENABLED diff --git a/include/simdjson/portability.h b/include/simdjson/portability.h index ac20e2db..bcb2f393 100644 --- a/include/simdjson/portability.h +++ b/include/simdjson/portability.h @@ -115,12 +115,12 @@ compiling for a known 64-bit platform." #define TARGET_WESTMERE TARGET_REGION("sse4.2,pclmul") #define TARGET_ARM64 -// Threading is disabled -#undef SIMDJSON_THREADS_ENABLED // Is threading enabled? #if defined(BOOST_HAS_THREADS) || defined(_REENTRANT) || defined(_MT) +#ifndef SIMDJSON_THREADS_ENABLED #define SIMDJSON_THREADS_ENABLED #endif +#endif // workaround for large stack sizes under -O0. @@ -129,7 +129,9 @@ compiling for a known 64-bit platform." #ifndef __OPTIMIZE__ // Apple systems have small stack sizes in secondary threads. // Lack of compiler optimization may generate high stack usage. -// So we are disabling multithreaded support for safety. +// Users may want to disable threads for safety, but only when +// in debug mode which we detect by the fact that the __OPTIMIZE__ +// macro is not defined. #undef SIMDJSON_THREADS_ENABLED #endif #endif diff --git a/simdjson-flags.cmake b/simdjson-flags.cmake index fd99e5f8..af913035 100644 --- a/simdjson-flags.cmake +++ b/simdjson-flags.cmake @@ -77,8 +77,10 @@ if(SIMDJSON_ENABLE_THREADS) set(CMAKE_THREAD_PREFER_PTHREAD TRUE) set(THREADS_PREFER_PTHREAD_FLAG TRUE) find_package(Threads REQUIRED) + target_link_libraries(simdjson-flags INTERFACE Threads::Threads) target_link_libraries(simdjson-flags INTERFACE ${CMAKE_THREAD_LIBS_INIT}) target_compile_options(simdjson-flags INTERFACE ${CMAKE_THREAD_LIBS_INIT}) + target_compile_definitions(simdjson-flags INTERFACE SIMDJSON_THREADS_ENABLED=1) endif() option(SIMDJSON_SANITIZE "Sanitize addresses" OFF) diff --git a/tests/basictests.cpp b/tests/basictests.cpp index 537ead0d..ff041114 100644 --- a/tests/basictests.cpp +++ b/tests/basictests.cpp @@ -372,6 +372,43 @@ namespace document_stream_tests { simdjson::dom::document_stream s1 = parse_many_stream_return(parser, str); } + bool small_window() { + std::cout << "Running " << __func__ << std::endl; + auto json = R"({"error":[],"result":{"token":"xxx"}}{"error":[],"result":{"token":"xxx"}})"_padded; + simdjson::dom::parser parser; + size_t count = 0; + size_t window_size = 10; // deliberately too small + for (auto doc : parser.parse_many(json, window_size)) { + if (!doc.error()) { + std::cerr << "Expected a capacity error " << doc.error() << std::endl; + return false; + } + count++; + } + if(count == 2) { + std::cerr << "Expected a capacity error " << std::endl; + return false; + } + return true; + } + + bool large_window() { + std::cout << "Running " << __func__ << std::endl; +#if SIZE_MAX > 17179869184 + auto json = R"({"error":[],"result":{"token":"xxx"}}{"error":[],"result":{"token":"xxx"}})"_padded; + simdjson::dom::parser parser; + size_t count = 0; + uint64_t window_size{17179869184}; // deliberately too big + for (auto doc : parser.parse_many(json, size_t(window_size))) { + if (!doc.error()) { + std::cerr << "I expected a failure (too big) but got " << doc.error() << std::endl; + return false; + } + count++; + } +#endif + return true; + } static bool parse_json_message_issue467(simdjson::padded_string &json, size_t expectedcount) { simdjson::dom::parser parser; size_t count = 0; @@ -504,7 +541,9 @@ namespace document_stream_tests { } bool run() { - return json_issue467() && + return small_window() && + large_window() && + json_issue467() && document_stream_test() && document_stream_utf8_test(); }