Using a worker instead of a thread per batch (#920)

In the parse_many function, we have one thread doing the stage 1, while the main thread does stage 2. So if stage 1 and stage 2 take half the time, the parse_many could run at twice the speed. It is unlikely to do so. Still, we see benefits of about 40% due to threading.

To achieve this interleaving, we load the data in batches (blocks) of some size. In the current code (master), we create a new thread for each batch. Thread creation is expensive so our approach only works over sizeable batches. This PR improves things and makes parse_many faster when using small batches.

  This fixes our parse_stream benchmark which is just busted.
  This replaces the one-thread per batch routine by a worker object that reuses the same thread. In benchmarks, this allows us to get the same maximal speed, but with smaller processing blocks. It does not help much with larger blocks because the cost of the thread create gets amortized efficiently.
This PR makes parse_many beneficial over small datasets. It also makes us less dependent on the thread creation time.

Unfortunately, it is going to be difficult to say anything definitive in general. The cost of creating a thread varies widely depending on the OS. On some systems, it might be cheap, in others very expensive. It should be expected that the new code will depend less drastically on the performances of the underlying system, since we create juste one thread.

Co-authored-by: John Keiser <john@johnkeiser.com>
Co-authored-by: Daniel Lemire <lemire@gmai.com>
This commit is contained in:
Daniel Lemire 2020-06-12 16:51:18 -04:00 committed by GitHub
parent 1febf2ec83
commit 4dfbf98e4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 331 additions and 141 deletions

274
benchmark/parse_stream.cpp Executable file → Normal file
View File

@ -1,147 +1,181 @@
#include <iostream>
#include <algorithm>
#include <chrono>
#include <vector>
#include <iostream>
#include <map>
#include <vector>
#include "simdjson.h"
#define NB_ITERATION 5
#define MIN_BATCH_SIZE 200000
#define NB_ITERATION 20
#define MIN_BATCH_SIZE 10000
#define MAX_BATCH_SIZE 10000000
bool test_baseline = false;
bool test_per_batch = true;
bool test_best_batch = true;
bool test_best_batch = false;
bool compare(std::pair<size_t, double> i, std::pair<size_t, double> j){
return i.second > j.second;
bool compare(std::pair<size_t, double> i, std::pair<size_t, double> j) {
return i.second > j.second;
}
int main (int argc, char *argv[]){
int main(int argc, char *argv[]) {
if (argc <= 1) {
std::cerr << "Usage: " << argv[0] << " <jsonfile>" << std::endl;
exit(1);
}
const char *filename = argv[1];
auto [p, err] = simdjson::padded_string::load(filename);
if (err) {
std::cerr << "Could not load the file " << filename << std::endl;
if (argc <= 1) {
std::cerr << "Usage: " << argv[0] << " <jsonfile>" << std::endl;
exit(1);
}
const char *filename = argv[1];
auto[p, err] = simdjson::padded_string::load(filename);
if (err) {
std::cerr << "Could not load the file " << filename << std::endl;
return EXIT_FAILURE;
}
if (test_baseline) {
std::wclog << "Baseline: Getline + normal parse... " << std::endl;
std::cout << "Gigabytes/second\t"
<< "Nb of documents parsed" << std::endl;
for (auto i = 0; i < 3; i++) {
// Actual test
simdjson::dom::parser parser;
simdjson::error_code alloc_error = parser.allocate(p.size());
if (alloc_error) {
std::cerr << alloc_error << std::endl;
return EXIT_FAILURE;
}
std::istringstream ss(std::string(p.data(), p.size()));
auto start = std::chrono::steady_clock::now();
int count = 0;
std::string line;
int parse_res = simdjson::SUCCESS;
while (getline(ss, line)) {
// TODO we're likely triggering simdjson's padding reallocation here. Is
// that intentional?
parser.parse(line);
count++;
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> secs = end - start;
double speedinGBs = static_cast<double>(p.size()) /
(static_cast<double>(secs.count()) * 1000000000.0);
std::cout << speedinGBs << "\t\t\t\t" << count << std::endl;
if (parse_res != simdjson::SUCCESS) {
std::cerr << "Parsing failed" << std::endl;
exit(1);
}
}
if (test_baseline) {
std::wclog << "Baseline: Getline + normal parse... " << std::endl;
std::cout << "Gigabytes/second\t" << "Nb of documents parsed" << std::endl;
for (auto i = 0; i < 3; i++) {
//Actual test
simdjson::dom::parser parser;
simdjson::error_code alloc_error = parser.allocate(p.size());
if (alloc_error) {
std::cerr << alloc_error << std::endl;
return EXIT_FAILURE;
}
std::istringstream ss(std::string(p.data(), p.size()));
}
auto start = std::chrono::steady_clock::now();
int count = 0;
std::string line;
int parse_res = simdjson::SUCCESS;
while (getline(ss, line)) {
// TODO we're likely triggering simdjson's padding reallocation here. Is that intentional?
parser.parse(line);
count++;
}
std::map<size_t, double> batch_size_res;
if (test_per_batch) {
std::wclog << "parse_many: Speed per batch_size... from " << MIN_BATCH_SIZE
<< " bytes to " << MAX_BATCH_SIZE << " bytes..." << std::endl;
std::cout << "Batch Size\t"
<< "Gigabytes/second\t"
<< "Nb of documents parsed" << std::endl;
for (size_t i = MIN_BATCH_SIZE; i <= MAX_BATCH_SIZE;
i += (MAX_BATCH_SIZE - MIN_BATCH_SIZE) / 100) {
batch_size_res.insert(std::pair<size_t, double>(i, 0));
int count;
for (size_t j = 0; j < 5; j++) {
// Actual test
simdjson::dom::parser parser;
simdjson::error_code error;
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> secs = end - start;
double speedinGBs = static_cast<double>(p.size()) / (static_cast<double>(secs.count()) * 1000000000.0);
std::cout << speedinGBs << "\t\t\t\t" << count << std::endl;
if (parse_res != simdjson::SUCCESS) {
std::cerr << "Parsing failed" << std::endl;
exit(1);
}
auto start = std::chrono::steady_clock::now();
count = 0;
for (auto result : parser.parse_many(p, i)) {
error = result.error();
if (error != simdjson::SUCCESS) {
std::wcerr << "Parsing failed with: " << error_message(error) << std::endl;
exit(1);
}
count++;
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> secs = end - start;
double speedinGBs = static_cast<double>(p.size()) /
(static_cast<double>(secs.count()) * 1000000000.0);
if (speedinGBs > batch_size_res.at(i))
batch_size_res[i] = speedinGBs;
}
std::cout << i << "\t\t" << std::fixed << std::setprecision(3)
<< batch_size_res.at(i) << "\t\t\t\t" << count << std::endl;
}
}
size_t optimal_batch_size{};
double best_speed{};
if (test_per_batch) {
std::pair<size_t, double> best_results;
best_results =
(*min_element(batch_size_res.begin(), batch_size_res.end(), compare));
optimal_batch_size = best_results.first;
best_speed = best_results.second;
} else {
optimal_batch_size = MIN_BATCH_SIZE;
}
std::wclog << "Seemingly optimal batch_size: " << optimal_batch_size << "..."
<< std::endl;
std::wclog << "Best speed: " << best_speed << "..." << std::endl;
if (test_best_batch) {
std::wclog << "Starting speed test... Best of " << NB_ITERATION
<< " iterations..." << std::endl;
std::vector<double> res;
for (int i = 0; i < NB_ITERATION; i++) {
// Actual test
simdjson::dom::parser parser;
simdjson::error_code error;
auto start = std::chrono::steady_clock::now();
// This includes allocation of the parser
for (auto result : parser.parse_many(p, optimal_batch_size)) {
error = result.error();
if (error != simdjson::SUCCESS) {
std::wcerr << "Parsing failed with: " << error_message(error) << std::endl;
exit(1);
}
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> secs = end - start;
res.push_back(secs.count());
}
std::map<size_t, double> batch_size_res;
if(test_per_batch) {
std::wclog << "parse_many: Speed per batch_size... from " << MIN_BATCH_SIZE
<< " bytes to " << MAX_BATCH_SIZE << " bytes..." << std::endl;
std::cout << "Batch Size\t" << "Gigabytes/second\t" << "Nb of documents parsed" << std::endl;
for (size_t i = MIN_BATCH_SIZE; i <= MAX_BATCH_SIZE; i += (MAX_BATCH_SIZE - MIN_BATCH_SIZE) / 50) {
batch_size_res.insert(std::pair<size_t, double>(i, 0));
int count;
for (size_t j = 0; j < 5; j++) {
//Actual test
simdjson::dom::parser parser;
simdjson::error_code error;
double min_result = *min_element(res.begin(), res.end());
double speedinGBs =
static_cast<double>(p.size()) / (min_result * 1000000000.0);
auto start = std::chrono::steady_clock::now();
count = 0;
for (auto result : parser.parse_many(p, 4000000)) {
error = result.error();
count++;
}
auto end = std::chrono::steady_clock::now();
std::cout << "Min: " << min_result << " bytes read: " << p.size()
<< " Gigabytes/second: " << speedinGBs << std::endl;
}
#ifdef SIMDJSON_THREADS_ENABLED
// Multithreading probably does not help matters for small files (less than 10
// MB).
if (p.size() < 10000000) {
std::cout << std::endl;
std::chrono::duration<double> secs = end - start;
double speedinGBs = static_cast<double>(p.size()) / (static_cast<double>(secs.count()) * 1000000000.0);
if (speedinGBs > batch_size_res.at(i))
batch_size_res[i] = speedinGBs;
std::cout << "Warning: your file is small and the performance results are "
"probably meaningless"
<< std::endl;
std::cout << "as far as multithreaded performance goes." << std::endl;
if (error != simdjson::SUCCESS) {
std::wcerr << "Parsing failed with: " << error << std::endl;
exit(1);
}
}
std::cout << i << "\t\t" << std::fixed << std::setprecision(3) << batch_size_res.at(i) << "\t\t\t\t" << count << std::endl;
std::cout << std::endl;
}
}
std::cout
<< "Try to concatenate the file with itself to generate a large one."
<< std::endl;
std::cout << "In bash: " << std::endl;
std::cout << "for i in {1..1000}; do cat '" << filename
<< "' >> bar.ndjson; done" << std::endl;
std::cout << argv[0] << " bar.ndjson" << std::endl;
}
#endif
if (test_best_batch) {
size_t optimal_batch_size;
if (test_per_batch) {
optimal_batch_size = (*min_element(batch_size_res.begin(), batch_size_res.end(), compare)).first;
} else {
optimal_batch_size = MIN_BATCH_SIZE;
}
std::wclog << "Starting speed test... Best of " << NB_ITERATION << " iterations..." << std::endl;
std::wclog << "Seemingly optimal batch_size: " << optimal_batch_size << "..." << std::endl;
std::vector<double> res;
for (int i = 0; i < NB_ITERATION; i++) {
// Actual test
simdjson::dom::parser parser;
simdjson::error_code error;
auto start = std::chrono::steady_clock::now();
// TODO this includes allocation of the parser; is that intentional?
for (auto result : parser.parse_many(p, 4000000)) {
error = result.error();
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> secs = end - start;
res.push_back(secs.count());
if (error != simdjson::SUCCESS) {
std::wcerr << "Parsing failed with: " << error << std::endl;
exit(1);
}
}
double min_result = *min_element(res.begin(), res.end());
double speedinGBs = static_cast<double>(p.size()) / (min_result * 1000000000.0);
std::cout << "Min: " << min_result << " bytes read: " << p.size()
<< " Gigabytes/second: " << speedinGBs << std::endl;
}
return 0;
return 0;
}

View File

@ -452,7 +452,7 @@ The simdjson library also support multithreaded JSON streaming through a large f
smaller JSON documents in either [ndjson](http://ndjson.org) or [JSON lines](http://jsonlines.org)
format. If your JSON documents all contain arrays or objects, we even support direct file
concatenation without whitespace. The concatenated file has no size restrictions (including larger
than 4GB), though each individual document must be less than 4GB.
than 4GB), though each individual document must be no larger than 4 GB.
Here is a simple example, given "x.json" with this content:
@ -472,6 +472,8 @@ for (dom::element doc : parser.load_many(filename)) {
In-memory ndjson strings can be parsed as well, with `parser.parse_many(string)`.
Both `load_many` and `parse_many` take an optional parameter `size_t batch_size` which defines the window processing size. It is set by default to a large value (`1000000` corresponding to 1 MB). None of your JSON documents should exceed this window size, or else you will get the error `simdjson::CAPACITY`. You cannot set this window size larger than 4 GB: you will get the error `simdjson::CAPACITY`. The smaller the window size is, the less memory the function will use. Setting the window size too small (e.g., less than 100 kB) may also impact performance negatively. Leaving it to 1 MB is expected to be a good choice, unless you have some larger documents.
See [parse_many.md](parse_many.md) for detailed information and design.
Thread Safety

View File

@ -6,11 +6,63 @@
#include "simdjson/error.h"
#ifdef SIMDJSON_THREADS_ENABLED
#include <thread>
#include <mutex>
#include <condition_variable>
#endif
namespace simdjson {
namespace dom {
#ifdef SIMDJSON_THREADS_ENABLED
struct stage1_worker {
stage1_worker() noexcept = default;
stage1_worker(const stage1_worker&) = delete;
stage1_worker(stage1_worker&&) = delete;
stage1_worker operator=(const stage1_worker&) = delete;
~stage1_worker();
/**
* We only start the thread when it is needed, not at object construction, this may throw.
* You should only call this once.
**/
void start_thread();
/**
* Start a stage 1 job. You should first call 'run', then 'finish'.
* You must call start_thread once before.
*/
void run(document_stream * ds, dom::parser * stage1, size_t next_batch_start);
/** Wait for the run to finish (blocking). You should first call 'run', then 'finish'. **/
void finish();
private:
/**
* Normally, we would never stop the thread. But we do in the destructor.
* This function is only safe assuming that you are not waiting for results. You
* should have called run, then finish, and be done.
**/
void stop_thread();
std::thread thread{};
/** These three variables define the work done by the thread. **/
dom::parser * stage1_thread_parser{};
size_t _next_batch_start{};
document_stream * owner{};
/**
* We have two state variables. This could be streamlined to one variable in the future but
* we use two for clarity.
*/
bool has_work{false};
bool can_work{true};
/**
* We lock using a mutex.
*/
std::mutex locking_mutex{};
std::condition_variable cond_var{};
};
#endif
/**
* A forward-only stream of documents.
*
@ -142,8 +194,8 @@ private:
/** The error returned from the stage 1 thread. */
error_code stage1_thread_error{UNINITIALIZED};
/** The thread used to run stage 1 against the next batch in the background. */
std::thread stage1_thread{};
friend struct stage1_worker;
std::unique_ptr<stage1_worker> worker{new(std::nothrow) stage1_worker()};
/**
* The parser used to run stage 1 in the background. Will be swapped
* with the regular parser when finished.

View File

@ -159,6 +159,10 @@ public:
* documents that consist of an object or array may omit the whitespace between them, concatenating
* with no separator. documents that consist of a single primitive (i.e. documents that are not
* arrays or objects) MUST be separated with whitespace.
*
* The documents must not exceed batch_size bytes (by default 1MB) or they will fail to parse.
* Setting batch_size to excessively large or excesively small values may impact negatively the
* performance.
*
* ### Error Handling
*
@ -216,6 +220,10 @@ public:
* documents that consist of an object or array may omit the whitespace between them, concatenating
* with no separator. documents that consist of a single primitive (i.e. documents that are not
* arrays or objects) MUST be separated with whitespace.
*
* The documents must not exceed batch_size bytes (by default 1MB) or they will fail to parse.
* Setting batch_size to excessively large or excesively small values may impact negatively the
* performance.
*
* ### Error Handling
*

View File

@ -5,10 +5,65 @@
#include <algorithm>
#include <limits>
#include <stdexcept>
namespace simdjson {
namespace dom {
#ifdef SIMDJSON_THREADS_ENABLED
inline void stage1_worker::finish() {
std::unique_lock<std::mutex> lock(locking_mutex);
cond_var.wait(lock, [this]{return has_work == false;});
}
inline stage1_worker::~stage1_worker() {
stop_thread();
}
inline void stage1_worker::start_thread() {
std::unique_lock<std::mutex> lock(locking_mutex);
if(thread.joinable()) {
return; // This should never happen but we never want to create more than one thread.
}
thread = std::thread([this]{
while(can_work) {
std::unique_lock<std::mutex> thread_lock(locking_mutex);
cond_var.wait(thread_lock, [this]{return has_work || !can_work;});
if(!can_work) {
break;
}
this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
this->_next_batch_start);
this->has_work = false;
thread_lock.unlock();
cond_var.notify_one(); // will notify "finish"
}
}
);
}
inline void stage1_worker::stop_thread() {
std::unique_lock<std::mutex> lock(locking_mutex);
// We have to make sure that all locks can be released.
can_work = false;
has_work = false;
lock.unlock();
cond_var.notify_all();
if(thread.joinable()) {
thread.join();
}
}
inline void stage1_worker::run(document_stream * ds, dom::parser * stage1, size_t next_batch_start) {
std::unique_lock<std::mutex> lock(locking_mutex);
owner = ds;
_next_batch_start = next_batch_start;
stage1_thread_parser = stage1;
has_work = true;
lock.unlock();
cond_var.notify_one();// will notify the thread lock
}
#endif
really_inline document_stream::document_stream(
dom::parser &_parser,
const uint8_t *_buf,
@ -22,15 +77,14 @@ really_inline document_stream::document_stream(
batch_size{_batch_size},
error{_error}
{
#ifdef SIMDJSON_THREADS_ENABLED
if(worker.get() == nullptr) {
error = MEMALLOC;
}
#endif
}
inline document_stream::~document_stream() noexcept {
#ifdef SIMDJSON_THREADS_ENABLED
// TODO kill the thread, why should people have to wait for a non-side-effecting operation to complete
if (stage1_thread.joinable()) {
stage1_thread.join();
}
#endif
}
really_inline document_stream::iterator document_stream::begin() noexcept {
@ -80,6 +134,7 @@ inline void document_stream::start() noexcept {
// Kick off the first thread if needed
error = stage1_thread_parser.ensure_capacity(batch_size);
if (error) { return; }
worker->start_thread();
start_stage1_thread();
if (error) { return; }
}
@ -93,7 +148,6 @@ inline void document_stream::next() noexcept {
// Load the next document from the batch
error = parser.implementation->stage2_next(parser.doc);
// If that was the last document in the batch, load another batch (if available)
while (error == EMPTY) {
batch_start = next_batch_start();
@ -105,7 +159,6 @@ inline void document_stream::next() noexcept {
error = run_stage1(parser, batch_start);
#endif
if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
// Run stage 2 on the first document in the batch
error = parser.implementation->stage2_next(parser.doc);
}
@ -128,8 +181,7 @@ inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_star
#ifdef SIMDJSON_THREADS_ENABLED
inline void document_stream::load_from_stage1_thread() noexcept {
stage1_thread.join();
worker->finish();
// Swap to the parser that was loaded up in the thread. Make sure the parser has
// enough memory to swap to, as well.
std::swap(parser, stage1_thread_parser);
@ -149,9 +201,8 @@ inline void document_stream::start_stage1_thread() noexcept {
// TODO this is NOT exception-safe.
this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
size_t _next_batch_start = this->next_batch_start();
stage1_thread = std::thread([this, _next_batch_start] {
this->stage1_thread_error = run_stage1(this->stage1_thread_parser, _next_batch_start);
});
worker->run(this, & this->stage1_thread_parser, _next_batch_start);
}
#endif // SIMDJSON_THREADS_ENABLED

View File

@ -115,12 +115,12 @@ compiling for a known 64-bit platform."
#define TARGET_WESTMERE TARGET_REGION("sse4.2,pclmul")
#define TARGET_ARM64
// Threading is disabled
#undef SIMDJSON_THREADS_ENABLED
// Is threading enabled?
#if defined(BOOST_HAS_THREADS) || defined(_REENTRANT) || defined(_MT)
#ifndef SIMDJSON_THREADS_ENABLED
#define SIMDJSON_THREADS_ENABLED
#endif
#endif
// workaround for large stack sizes under -O0.
@ -129,7 +129,9 @@ compiling for a known 64-bit platform."
#ifndef __OPTIMIZE__
// Apple systems have small stack sizes in secondary threads.
// Lack of compiler optimization may generate high stack usage.
// So we are disabling multithreaded support for safety.
// Users may want to disable threads for safety, but only when
// in debug mode which we detect by the fact that the __OPTIMIZE__
// macro is not defined.
#undef SIMDJSON_THREADS_ENABLED
#endif
#endif

View File

@ -77,8 +77,10 @@ if(SIMDJSON_ENABLE_THREADS)
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED)
target_link_libraries(simdjson-flags INTERFACE Threads::Threads)
target_link_libraries(simdjson-flags INTERFACE ${CMAKE_THREAD_LIBS_INIT})
target_compile_options(simdjson-flags INTERFACE ${CMAKE_THREAD_LIBS_INIT})
target_compile_definitions(simdjson-flags INTERFACE SIMDJSON_THREADS_ENABLED=1)
endif()
option(SIMDJSON_SANITIZE "Sanitize addresses" OFF)

View File

@ -372,6 +372,43 @@ namespace document_stream_tests {
simdjson::dom::document_stream s1 = parse_many_stream_return(parser, str);
}
bool small_window() {
std::cout << "Running " << __func__ << std::endl;
auto json = R"({"error":[],"result":{"token":"xxx"}}{"error":[],"result":{"token":"xxx"}})"_padded;
simdjson::dom::parser parser;
size_t count = 0;
size_t window_size = 10; // deliberately too small
for (auto doc : parser.parse_many(json, window_size)) {
if (!doc.error()) {
std::cerr << "Expected a capacity error " << doc.error() << std::endl;
return false;
}
count++;
}
if(count == 2) {
std::cerr << "Expected a capacity error " << std::endl;
return false;
}
return true;
}
bool large_window() {
std::cout << "Running " << __func__ << std::endl;
#if SIZE_MAX > 17179869184
auto json = R"({"error":[],"result":{"token":"xxx"}}{"error":[],"result":{"token":"xxx"}})"_padded;
simdjson::dom::parser parser;
size_t count = 0;
uint64_t window_size{17179869184}; // deliberately too big
for (auto doc : parser.parse_many(json, size_t(window_size))) {
if (!doc.error()) {
std::cerr << "I expected a failure (too big) but got " << doc.error() << std::endl;
return false;
}
count++;
}
#endif
return true;
}
static bool parse_json_message_issue467(simdjson::padded_string &json, size_t expectedcount) {
simdjson::dom::parser parser;
size_t count = 0;
@ -504,7 +541,9 @@ namespace document_stream_tests {
}
bool run() {
return json_issue467() &&
return small_window() &&
large_window() &&
json_issue467() &&
document_stream_test() &&
document_stream_utf8_test();
}