From aaa12c8fb684e7692cfa5d8cd3c2da39a000d65f Mon Sep 17 00:00:00 2001 From: Daniel Lemire Date: Wed, 2 Dec 2020 11:33:36 -0500 Subject: [PATCH] more thread testing (#1324) * Adding a few tests. * More tests. * Trailing. * More links/documentations. --- .circleci/config.yml | 13 +++- .github/workflows/fix-trailing-whitespace.yml | 1 + include/simdjson/dom/document_stream-inl.h | 30 +++++++- tests/document_stream_tests.cpp | 75 ++++++++++++++++++- 4 files changed, 113 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1360cc3a..df78a919 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -181,7 +181,16 @@ jobs: executor: clang10 environment: { CMAKE_FLAGS: -DSIMDJSON_BUILD_STATIC=OFF -DSIMDJSON_SANITIZE=ON, CTEST_FLAGS: --output-on-failure -E checkperf } steps: [ cmake_test ] - + threadsanitize-gcc10: + description: Build and run tests on GCC 10 and AVX 2 with a cmake sanitize build + executor: gcc10 + environment: { CMAKE_FLAGS: -DSIMDJSON_BUILD_STATIC=OFF -DSIMDJSON_SANITIZE_THREADS=ON, BUILD_FLAGS: "", CTEST_FLAGS: --output-on-failure -E checkperf } + steps: [ cmake_test ] + threadsanitize-clang10: + description: Build and run tests on clang 10 and AVX 2 with a cmake sanitize build + executor: clang10 + environment: { CMAKE_FLAGS: -DSIMDJSON_BUILD_STATIC=OFF -DSIMDJSON_SANITIZE_THREADS=ON, CTEST_FLAGS: --output-on-failure -E checkperf } + steps: [ cmake_test ] # dynamic dynamic-gcc10: description: Build and run tests on GCC 10 and AVX 2 with a cmake dynamic build @@ -260,6 +269,8 @@ workflows: # full single-implementation tests - sanitize-gcc10 - sanitize-clang10 + - threadsanitize-gcc10 + - threadsanitize-clang10 - dynamic-gcc10 - dynamic-clang10 - unthreaded-gcc10 diff --git a/.github/workflows/fix-trailing-whitespace.yml b/.github/workflows/fix-trailing-whitespace.yml index 56db4bec..63706869 100644 --- a/.github/workflows/fix-trailing-whitespace.yml +++ b/.github/workflows/fix-trailing-whitespace.yml @@ -15,6 +15,7 @@ jobs: set -eu scripts/remove_trailing_whitespace.sh git diff >whitespace.patch + cat whitespace.patch if [ $(wc -c lock(locking_mutex); cond_var.wait(lock, [this]{return has_work == false;}); } inline stage1_worker::~stage1_worker() { + // The thread may never outlive the stage1_worker instance + // and will always be stopped/joined before the stage1_worker + // instance is gone. stop_thread(); } @@ -26,15 +33,22 @@ inline void stage1_worker::start_thread() { thread = std::thread([this]{ while(true) { std::unique_lock thread_lock(locking_mutex); + // We wait for either "run" or "stop_thread" to be called. cond_var.wait(thread_lock, [this]{return has_work || !can_work;}); + // If, for some reason, the stop_thread() method was called (i.e., the + // destructor of stage1_worker is called, then we want to immediately destroy + // the thread (and not do any more processing). if(!can_work) { break; } this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser, this->_next_batch_start); this->has_work = false; - thread_lock.unlock(); + // The condition variable call should be moved after thread_lock.unlock() for performance + // reasons but thread sanitizers may report it as a data race if we do. + // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock cond_var.notify_one(); // will notify "finish" + thread_lock.unlock(); } } ); @@ -46,8 +60,8 @@ inline void stage1_worker::stop_thread() { // We have to make sure that all locks can be released. can_work = false; has_work = false; - lock.unlock(); cond_var.notify_all(); + lock.unlock(); if(thread.joinable()) { thread.join(); } @@ -59,8 +73,11 @@ inline void stage1_worker::run(document_stream * ds, dom::parser * stage1, size_ _next_batch_start = next_batch_start; stage1_thread_parser = stage1; has_work = true; + // The condition variable call should be moved after thread_lock.unlock() for performance + // reasons but thread sanitizers may report it as a data race if we do. + // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock + cond_var.notify_one(); // will notify the thread lock that we have work lock.unlock(); - cond_var.notify_one();// will notify the thread lock } #endif @@ -167,8 +184,13 @@ inline void document_stream::start() noexcept { // Always run the first stage 1 parse immediately batch_start = 0; error = run_stage1(*parser, batch_start); + if(error == EMPTY) { + // In exceptional cases, we may start with an empty block + batch_start = next_batch_start(); + if (batch_start >= len) { return; } + error = run_stage1(*parser, batch_start); + } if (error) { return; } - #ifdef SIMDJSON_THREADS_ENABLED if (use_thread && next_batch_start() < len) { // Kick off the first thread if needed diff --git a/tests/document_stream_tests.cpp b/tests/document_stream_tests.cpp index 58736cd4..493c5050 100644 --- a/tests/document_stream_tests.cpp +++ b/tests/document_stream_tests.cpp @@ -81,6 +81,76 @@ namespace document_stream_tests { simdjson::dom::document_stream s1 = parse_many_stream_return(parser, str); } + bool stress_data_race() { + std::cout << "Running " << __func__ << std::endl; + // Correct JSON. + const simdjson::padded_string input = R"([1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] )"_padded;; + // This will spin up and tear down 1000 worker threads. + for(size_t i = 0; i < 1; i++) { + simdjson::dom::parser parser; + simdjson::dom::document_stream stream; + ASSERT_SUCCESS(parser.parse_many(input, 32).get(stream)); + for(auto doc: stream) { + auto error = doc.error(); + if(error) { + std::cout << "Expected no error but got " << error << std::endl; + return false; + } + } + } + return true; + } + + bool stress_data_race_with_error() { + std::cout << "Running " << __func__ << std::endl; + // Intentionally broken + const simdjson::padded_string input = R"([1,23] [1,23] [1,23] [1,23 [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] )"_padded;; + // This will spin up and tear down 1000 worker threads. + for(size_t i = 0; i < 1; i++) { + simdjson::dom::parser parser; + simdjson::dom::document_stream stream; + ASSERT_SUCCESS(parser.parse_many(input, 32).get(stream)); + size_t count = 0; + for(auto doc: stream) { + auto error = doc.error(); + if(count <= 2) { + if(error) { + std::cout << "Expected no error but got " << error << std::endl; + return false; + } + } else { + if(!error) { + std::cout << "Expected an error but got " << error << std::endl; + return false; + } + break; + } + count++; + } + } + return true; + } + + bool test_leading_spaces() { + std::cout << "Running " << __func__ << std::endl; + const simdjson::padded_string input = R"( [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] )"_padded;; + size_t count = 0; + simdjson::dom::parser parser; + simdjson::dom::document_stream stream; + ASSERT_SUCCESS(parser.parse_many(input, 32).get(stream)); + count = 0; + for(auto doc: stream) { + auto error = doc.error(); + if(error) { + std::cout << "Expected no error but got " << error << std::endl; + return false; + } + count++; + } + return count == 15; + } + + bool issue1307() { std::cout << "Running " << __func__ << std::endl; const simdjson::padded_string input = decode_base64("AgAMACA="); @@ -588,7 +658,10 @@ namespace document_stream_tests { } bool run() { - return simple_example() && + return stress_data_race() && + stress_data_race_with_error() && + test_leading_spaces() && + simple_example() && truncated_window() && truncated_window_unclosed_string() && issue1307() &&