Bringing ndjson(document_stream) to On Demand (#1643)
* Update basic.md to document JSON pointer for On Demand. * Add automatic rewind for at_pointer * Remove DOM examples in basics.md and update documentation reflecting addition of at_pointer automatic rewinding. * Review * Add test * Add document_stream constructors and iterate_many * Attempt to implement streaming. * Kind of fixed next() for getting next document * Temporary save. * Putting in working order. * Add working doc_index and add function next_document() * Attempt to implement streaming. * Re-anchoring json_iterator after a call to stage 1 * I am convinced it should be a 'while'. * Add source() with test. * Add truncated_bytes(). * Fix casting issues. * Fix old style cast. * Fix privacy issue. * Fix privacy issues. * Again * . * Add more tests. Add error() for iterator class. * Fix source() to not included whitespaces between documents. * Fixing CI. * Fix source() for multiple batches. Add new tests. * Fix batch_start when document has leading spaces. Add new tests for that. * Add new tests. * Temporary save. * Working hacky multithread version. * Small fix in header files. * Correct version (not working). * Adding a move assignment to ondemand::parser. * Fix attempt by changing std::swap. * Moving DEFAULT_BATCH_SIZE and MINIMAL_BATCH_SIZE. * Update doc and readme tests. * Update basics.md * Update readme_examples tests. * Fix exceptions in test. * Partial setup for amazon_cellphones. * Benchmark with vectors. * Benchmark with maps * With vectors again. * Fix for weighted average. * DOM benchmark. * Fix typos. Add On Demand benchmark. * Add large amazon_cellphones benchmark for DOM * Add benchmark for On demand. * Fix broken read_me test. * Add parser.threaded to enable/disable thread usage. Co-authored-by: Daniel Lemire <lemire@gmail.com>
This commit is contained in:
parent
2dac3705d2
commit
5c590b8434
|
@ -25,6 +25,6 @@ jobs:
|
|||
mkdir build &&
|
||||
cd build &&
|
||||
cmake -DSIMDJSON_DEVELOPER_MODE=ON -DSIMDJSON_SANITIZE_THREADS=ON .. &&
|
||||
cmake --build . --target document_stream_tests --target parse_many_test &&
|
||||
cmake --build . --target document_stream_tests --target ondemand_document_stream_tests --target parse_many_test &&
|
||||
ctest --output-on-failure -R parse_many_test &&
|
||||
ctest --output-on-failure -R document_stream_tests
|
|
@ -25,6 +25,6 @@ jobs:
|
|||
mkdir build &&
|
||||
cd build &&
|
||||
cmake -DSIMDJSON_DEVELOPER_MODE=ON -DSIMDJSON_SANITIZE_THREADS=ON .. &&
|
||||
cmake --build . --target document_stream_tests --target parse_many_test &&
|
||||
cmake --build . --target document_stream_tests --target ondemand_document_stream_tests --target parse_many_test &&
|
||||
ctest --output-on-failure -R parse_many_test &&
|
||||
ctest --output-on-failure -R document_stream_tests
|
|
@ -4,7 +4,6 @@
|
|||
{"column": 120 }
|
||||
],
|
||||
"files.trimTrailingWhitespace": true,
|
||||
"C_Cpp.autoAddFileAssociations": false,
|
||||
"files.associations": {
|
||||
"array": "cpp",
|
||||
"iterator": "cpp",
|
||||
|
@ -18,6 +17,7 @@
|
|||
"__errc": "cpp",
|
||||
"__functional_base": "cpp",
|
||||
"__hash_table": "cpp",
|
||||
"__locale": "cpp",
|
||||
"__mutex_base": "cpp",
|
||||
"__node_handle": "cpp",
|
||||
"__nullptr": "cpp",
|
||||
|
@ -33,6 +33,7 @@
|
|||
"cinttypes": "cpp",
|
||||
"clocale": "cpp",
|
||||
"cmath": "cpp",
|
||||
"codecvt": "cpp",
|
||||
"complex": "cpp",
|
||||
"condition_variable": "cpp",
|
||||
"cstdarg": "cpp",
|
||||
|
@ -46,6 +47,7 @@
|
|||
"cwctype": "cpp",
|
||||
"deque": "cpp",
|
||||
"exception": "cpp",
|
||||
"forward_list": "cpp",
|
||||
"fstream": "cpp",
|
||||
"functional": "cpp",
|
||||
"initializer_list": "cpp",
|
||||
|
@ -55,13 +57,17 @@
|
|||
"iostream": "cpp",
|
||||
"istream": "cpp",
|
||||
"limits": "cpp",
|
||||
"list": "cpp",
|
||||
"locale": "cpp",
|
||||
"map": "cpp",
|
||||
"memory": "cpp",
|
||||
"mutex": "cpp",
|
||||
"new": "cpp",
|
||||
"numeric": "cpp",
|
||||
"ostream": "cpp",
|
||||
"random": "cpp",
|
||||
"ratio": "cpp",
|
||||
"regex": "cpp",
|
||||
"set": "cpp",
|
||||
"sstream": "cpp",
|
||||
"stack": "cpp",
|
||||
|
@ -75,9 +81,10 @@
|
|||
"type_traits": "cpp",
|
||||
"typeinfo": "cpp",
|
||||
"unordered_map": "cpp",
|
||||
"unordered_set": "cpp",
|
||||
"utility": "cpp",
|
||||
"valarray": "cpp",
|
||||
"vector": "cpp",
|
||||
"*.ipp": "cpp",
|
||||
"*.tcc": "cpp"
|
||||
"*.ipp": "cpp"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
#pragma once
|
||||
|
||||
#include "json_benchmark/file_runner.h"
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
namespace amazon_cellphones {
|
||||
|
||||
using namespace json_benchmark;
|
||||
|
||||
struct brand {
|
||||
double cumulative_rating;
|
||||
uint64_t reviews_count;
|
||||
simdjson_really_inline bool operator==(const brand &other) const {
|
||||
return cumulative_rating == other.cumulative_rating &&
|
||||
reviews_count == other.reviews_count;
|
||||
}
|
||||
simdjson_really_inline bool operator!=(const brand &other) const { return !(*this == other); }
|
||||
};
|
||||
|
||||
simdjson_unused static std::ostream &operator<<(std::ostream &o, const brand &b) {
|
||||
o << "cumulative_rating: " << b.cumulative_rating << std::endl;
|
||||
o << "reviews_count: " << b.reviews_count << std::endl;
|
||||
return o;
|
||||
}
|
||||
|
||||
template<typename StringType>
|
||||
simdjson_unused static std::ostream &operator<<(std::ostream &o, const std::pair<const StringType, brand> &p) {
|
||||
o << "brand: " << p.first << std::endl;
|
||||
o << p.second;
|
||||
return o;
|
||||
}
|
||||
|
||||
template<typename I>
|
||||
struct runner : public file_runner<I> {
|
||||
std::map<typename I::StringType, brand> result{};
|
||||
|
||||
bool setup(benchmark::State &state) {
|
||||
return this->load_json(state, AMAZON_CELLPHONES_NDJSON);
|
||||
}
|
||||
|
||||
bool before_run(benchmark::State &state) {
|
||||
if (!file_runner<I>::before_run(state)) { return false; }
|
||||
result.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool run(benchmark::State &) {
|
||||
return this->implementation.run(this->json, result);
|
||||
}
|
||||
|
||||
template<typename R>
|
||||
bool diff(benchmark::State &state, runner<R> &reference) {
|
||||
return diff_results(state, result, reference.result, diff_flags::NONE);
|
||||
}
|
||||
|
||||
size_t items_per_iteration() {
|
||||
return result.size();
|
||||
}
|
||||
};
|
||||
|
||||
struct simdjson_dom;
|
||||
|
||||
template<typename I> simdjson_really_inline static void amazon_cellphones(benchmark::State &state) {
|
||||
run_json_benchmark<runner<I>, runner<simdjson_dom>>(state);
|
||||
}
|
||||
|
||||
} // namespace amazon_cellphones
|
|
@ -0,0 +1,44 @@
|
|||
#pragma once
|
||||
|
||||
#if SIMDJSON_EXCEPTIONS
|
||||
|
||||
#include "amazon_cellphones.h"
|
||||
|
||||
namespace amazon_cellphones {
|
||||
|
||||
using namespace simdjson;
|
||||
|
||||
struct simdjson_dom {
|
||||
using StringType = std::string;
|
||||
|
||||
dom::parser parser{};
|
||||
|
||||
bool run(simdjson::padded_string &json, std::map<StringType, brand> &result) {
|
||||
auto stream = parser.parse_many(json);
|
||||
auto i = stream.begin();
|
||||
++i; // Skip first line
|
||||
for (;i != stream.end(); ++i) {
|
||||
auto doc = *i;
|
||||
StringType copy(std::string_view(doc.at(1)));
|
||||
auto x = result.find(copy);
|
||||
if (x == result.end()) { // If key not found, add new key
|
||||
result.emplace(copy, amazon_cellphones::brand{
|
||||
double(doc.at(5)) * uint64_t(doc.at(7)),
|
||||
uint64_t(doc.at(7))
|
||||
});
|
||||
} else { // Otherwise, update key data
|
||||
x->second.cumulative_rating += double(doc.at(5)) * uint64_t(doc.at(7));
|
||||
x->second.reviews_count += uint64_t(doc.at(7));
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
BENCHMARK_TEMPLATE(amazon_cellphones, simdjson_dom)->UseManualTime();
|
||||
|
||||
} // namespace amazon_cellphones
|
||||
|
||||
#endif // SIMDJSON_EXCEPTIONS
|
|
@ -0,0 +1,65 @@
|
|||
#pragma once
|
||||
|
||||
#if SIMDJSON_EXCEPTIONS
|
||||
|
||||
#include "amazon_cellphones.h"
|
||||
|
||||
namespace amazon_cellphones {
|
||||
|
||||
using namespace simdjson;
|
||||
|
||||
struct simdjson_ondemand {
|
||||
using StringType = std::string;
|
||||
|
||||
ondemand::parser parser{};
|
||||
|
||||
bool run(simdjson::padded_string &json, std::map<StringType, brand> &result) {
|
||||
ondemand::document_stream stream = parser.iterate_many(json);
|
||||
ondemand::document_stream::iterator i = stream.begin();
|
||||
++i; // Skip first line
|
||||
for (;i != stream.end(); ++i) {
|
||||
auto & doc = *i;
|
||||
size_t index{0};
|
||||
StringType copy;
|
||||
double rating;
|
||||
uint64_t reviews;
|
||||
for ( auto value : doc ) {
|
||||
switch (index)
|
||||
{
|
||||
case 1:
|
||||
copy = StringType(std::string_view(value));
|
||||
break;
|
||||
case 5:
|
||||
rating = double(value);
|
||||
break;
|
||||
case 7:
|
||||
reviews = uint64_t(value);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
index++;
|
||||
}
|
||||
|
||||
auto x = result.find(copy);
|
||||
if (x == result.end()) { // If key not found, add new key
|
||||
result.emplace(copy, amazon_cellphones::brand{
|
||||
rating * reviews,
|
||||
reviews
|
||||
});
|
||||
} else { // Otherwise, update key data
|
||||
x->second.cumulative_rating += rating * reviews;
|
||||
x->second.reviews_count += reviews;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
BENCHMARK_TEMPLATE(amazon_cellphones, simdjson_ondemand)->UseManualTime();
|
||||
|
||||
} // namespace amazon_cellphones
|
||||
|
||||
#endif // SIMDJSON_EXCEPTIONS
|
|
@ -26,6 +26,12 @@ SIMDJSON_PUSH_DISABLE_ALL_WARNINGS
|
|||
|
||||
SIMDJSON_POP_DISABLE_WARNINGS
|
||||
|
||||
#include "amazon_cellphones/simdjson_dom.h"
|
||||
#include "amazon_cellphones/simdjson_ondemand.h"
|
||||
|
||||
#include "large_amazon_cellphones/simdjson_dom.h"
|
||||
#include "large_amazon_cellphones/simdjson_ondemand.h"
|
||||
|
||||
#include "partial_tweets/simdjson_dom.h"
|
||||
#include "partial_tweets/simdjson_ondemand.h"
|
||||
#include "partial_tweets/yyjson.h"
|
||||
|
|
|
@ -4,5 +4,6 @@ namespace json_benchmark {
|
|||
|
||||
static constexpr const char *TWITTER_JSON = SIMDJSON_BENCHMARK_DATA_DIR "twitter.json";
|
||||
static constexpr const char *NUMBERS_JSON = SIMDJSON_BENCHMARK_DATA_DIR "numbers.json";
|
||||
static constexpr const char *AMAZON_CELLPHONES_NDJSON = SIMDJSON_BENCHMARK_DATA_DIR "amazon_cellphones.ndjson";
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <map>
|
||||
#include <sstream>
|
||||
#include <limits>
|
||||
|
||||
|
@ -52,6 +53,56 @@ struct result_differ<std::vector<T>, std::vector<U>> {
|
|||
}
|
||||
};
|
||||
|
||||
template<typename T, typename U, typename StringType>
|
||||
struct result_differ<std::map<StringType,T>, std::map<StringType,U>> {
|
||||
static bool diff(benchmark::State &state, const std::map<StringType,T> &result, const std::map<StringType,U> &reference, diff_flags flags) {
|
||||
auto result_iter = result.begin();
|
||||
auto reference_iter = reference.begin();
|
||||
while (result_iter != result.end() && reference_iter != reference.end()) {
|
||||
if (!diff_results(state, *result_iter, *reference_iter, flags)) { return false; }
|
||||
result_iter++;
|
||||
reference_iter++;
|
||||
}
|
||||
if (result_iter != result.end()) {
|
||||
std::stringstream str;
|
||||
str << "extra results (got " << result.size() << ", expected " << reference.size() << "): first extra element: " << *result_iter;
|
||||
state.SkipWithError(str.str().data());
|
||||
return false;
|
||||
} else if (reference_iter != reference.end()) {
|
||||
std::stringstream str;
|
||||
str << "missing results (got " << result.size() << ", expected " << reference.size() << "): first missing element: " << *reference_iter;
|
||||
state.SkipWithError(str.str().data());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
template<typename T, typename U>
|
||||
struct result_differ<std::map<std::string_view,T>, std::vector<std::string_view,U>> {
|
||||
static bool diff(benchmark::State &state, const std::map<std::string_view,T> &result, const std::map<std::string_view,U> &reference, diff_flags flags) {
|
||||
auto result_iter = result.begin();
|
||||
auto reference_iter = reference.begin();
|
||||
while (result_iter != result.end() && reference_iter != reference.end()) {
|
||||
if (!diff_results(state, *result_iter, *reference_iter, flags)) { return false; }
|
||||
result_iter++;
|
||||
reference_iter++;
|
||||
}
|
||||
if (result_iter != result.end()) {
|
||||
std::stringstream str;
|
||||
str << "extra results (got " << result.size() << ", expected " << reference.size() << "): first extra element: " << *result_iter;
|
||||
state.SkipWithError(str.str().data());
|
||||
return false;
|
||||
} else if (reference_iter != reference.end()) {
|
||||
std::stringstream str;
|
||||
str << "missing results (got " << result.size() << ", expected " << reference.size() << "): first missing element: " << *reference_iter;
|
||||
state.SkipWithError(str.str().data());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
template<>
|
||||
struct result_differ<double, double> {
|
||||
static bool diff(benchmark::State &state, const double &result, const double &reference, diff_flags flags) {
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
#pragma once
|
||||
|
||||
#include "json_benchmark/string_runner.h"
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
namespace large_amazon_cellphones {
|
||||
|
||||
static const simdjson::padded_string &get_built_json();
|
||||
|
||||
using namespace json_benchmark;
|
||||
|
||||
struct brand {
|
||||
double cumulative_rating;
|
||||
uint64_t reviews_count;
|
||||
simdjson_really_inline bool operator==(const brand &other) const {
|
||||
return cumulative_rating == other.cumulative_rating &&
|
||||
reviews_count == other.reviews_count;
|
||||
}
|
||||
simdjson_really_inline bool operator!=(const brand &other) const { return !(*this == other); }
|
||||
};
|
||||
|
||||
simdjson_unused static std::ostream &operator<<(std::ostream &o, const brand &b) {
|
||||
o << "cumulative_rating: " << b.cumulative_rating << std::endl;
|
||||
o << "reviews_count: " << b.reviews_count << std::endl;
|
||||
return o;
|
||||
}
|
||||
|
||||
template<typename StringType>
|
||||
simdjson_unused static std::ostream &operator<<(std::ostream &o, const std::pair<const StringType, brand> &p) {
|
||||
o << "brand: " << p.first << std::endl;
|
||||
o << p.second;
|
||||
return o;
|
||||
}
|
||||
|
||||
template<typename I>
|
||||
struct runner : public string_runner<I> {
|
||||
std::map<typename I::StringType, brand> result{};
|
||||
|
||||
runner() : string_runner<I>(get_built_json()) {}
|
||||
|
||||
bool before_run(benchmark::State &state) {
|
||||
if (!string_runner<I>::before_run(state)) { return false; }
|
||||
result.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool run(benchmark::State &) {
|
||||
return this->implementation.run(this->json, result);
|
||||
}
|
||||
|
||||
template<typename R>
|
||||
bool diff(benchmark::State &state, runner<R> &reference) {
|
||||
return diff_results(state, result, reference.result, diff_flags::NONE);
|
||||
}
|
||||
|
||||
size_t items_per_iteration() {
|
||||
return result.size();
|
||||
}
|
||||
};
|
||||
|
||||
static std::string build_json(size_t N) {
|
||||
std::ifstream in(AMAZON_CELLPHONES_NDJSON);
|
||||
std::string answer((std::istreambuf_iterator<char>(in)), std::istreambuf_iterator<char>());
|
||||
// Find position of first line to exclude it in further copies
|
||||
size_t first_line = answer.find('\n');
|
||||
std::string copy(answer,first_line + 1);
|
||||
size_t count{1};
|
||||
|
||||
while (answer.size() < N) {
|
||||
answer.append(copy);
|
||||
count++;
|
||||
}
|
||||
|
||||
std::cout << "Creating a source file spanning " << (answer.size() + 512) / (1024*1024) << " MB (" << count << " copies of original file)" << std::endl;
|
||||
return answer;
|
||||
}
|
||||
|
||||
static const simdjson::padded_string &get_built_json() {
|
||||
static simdjson::padded_string json = build_json(10*1024*1024);
|
||||
return json;
|
||||
}
|
||||
|
||||
|
||||
struct simdjson_dom;
|
||||
|
||||
template<typename I> simdjson_really_inline static void large_amazon_cellphones(benchmark::State &state) {
|
||||
run_json_benchmark<runner<I>, runner<simdjson_dom>>(state);
|
||||
}
|
||||
|
||||
} // namespace large_amazon_cellphones
|
|
@ -0,0 +1,45 @@
|
|||
#pragma once
|
||||
|
||||
#if SIMDJSON_EXCEPTIONS
|
||||
|
||||
#include "large_amazon_cellphones.h"
|
||||
#include <algorithm>
|
||||
|
||||
namespace large_amazon_cellphones {
|
||||
|
||||
using namespace simdjson;
|
||||
|
||||
struct simdjson_dom {
|
||||
using StringType = std::string;
|
||||
|
||||
dom::parser parser{};
|
||||
|
||||
bool run(simdjson::padded_string &json, std::map<StringType, brand> &result) {
|
||||
auto stream = parser.parse_many(json);
|
||||
auto i = stream.begin();
|
||||
++i; // Skip first line
|
||||
for (;i != stream.end(); ++i) {
|
||||
auto doc = *i;
|
||||
StringType copy(std::string_view(doc.at(1)));
|
||||
auto x = result.find(copy);
|
||||
if (x == result.end()) { // If key not found, add new key
|
||||
result.emplace(copy, large_amazon_cellphones::brand{
|
||||
double(doc.at(5)) * uint64_t(doc.at(7)),
|
||||
uint64_t(doc.at(7))
|
||||
});
|
||||
} else { // Otherwise, update key data
|
||||
x->second.cumulative_rating += double(doc.at(5)) * uint64_t(doc.at(7));
|
||||
x->second.reviews_count += uint64_t(doc.at(7));
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
BENCHMARK_TEMPLATE(large_amazon_cellphones, simdjson_dom)->UseManualTime();
|
||||
|
||||
} // namespace large_amazon_cellphones
|
||||
|
||||
#endif // SIMDJSON_EXCEPTIONS
|
|
@ -0,0 +1,65 @@
|
|||
#pragma once
|
||||
|
||||
#if SIMDJSON_EXCEPTIONS
|
||||
|
||||
#include "large_amazon_cellphones.h"
|
||||
|
||||
namespace large_amazon_cellphones {
|
||||
|
||||
using namespace simdjson;
|
||||
|
||||
struct simdjson_ondemand {
|
||||
using StringType = std::string;
|
||||
|
||||
ondemand::parser parser{};
|
||||
|
||||
bool run(simdjson::padded_string &json, std::map<StringType, brand> &result) {
|
||||
ondemand::document_stream stream = parser.iterate_many(json);
|
||||
ondemand::document_stream::iterator i = stream.begin();
|
||||
++i; // Skip first line
|
||||
for (;i != stream.end(); ++i) {
|
||||
auto & doc = *i;
|
||||
size_t index{0};
|
||||
StringType copy;
|
||||
double rating;
|
||||
uint64_t reviews;
|
||||
for ( auto value : doc ) {
|
||||
switch (index)
|
||||
{
|
||||
case 1:
|
||||
copy = StringType(std::string_view(value));
|
||||
break;
|
||||
case 5:
|
||||
rating = double(value);
|
||||
break;
|
||||
case 7:
|
||||
reviews = uint64_t(value);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
index++;
|
||||
}
|
||||
|
||||
auto x = result.find(copy);
|
||||
if (x == result.end()) { // If key not found, add new key
|
||||
result.emplace(copy, large_amazon_cellphones::brand{
|
||||
rating * reviews,
|
||||
reviews
|
||||
});
|
||||
} else { // Otherwise, update key data
|
||||
x->second.cumulative_rating += rating * reviews;
|
||||
x->second.reviews_count += reviews;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
BENCHMARK_TEMPLATE(large_amazon_cellphones, simdjson_ondemand)->UseManualTime();
|
||||
|
||||
} // namespace amazon_cellphones
|
||||
|
||||
#endif // SIMDJSON_EXCEPTIONS
|
|
@ -963,60 +963,38 @@ format. If your JSON documents all contain arrays or objects, we even support di
|
|||
concatenation without whitespace. The concatenated file has no size restrictions (including larger
|
||||
than 4GB), though each individual document must be no larger than 4 GB.
|
||||
|
||||
Here is a simple example, given `x.json` with this content:
|
||||
|
||||
```json
|
||||
{ "foo": 1 }
|
||||
{ "foo": 2 }
|
||||
{ "foo": 3 }
|
||||
```
|
||||
Here is a simple example:
|
||||
|
||||
```c++
|
||||
dom::parser parser;
|
||||
dom::document_stream docs = parser.load_many("x.json");
|
||||
for (dom::element doc : docs) {
|
||||
cout << doc["foo"] << endl;
|
||||
auto json = R"({ "foo": 1 } { "foo": 2 } { "foo": 3 } )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream docs = parser.iterate_many(json);
|
||||
for (auto & doc : docs) {
|
||||
std::cout << doc["foo"] << std::endl;
|
||||
}
|
||||
// Prints 1 2 3
|
||||
```
|
||||
|
||||
In-memory ndjson strings can be parsed as well, with `parser.parse_many(string)`:
|
||||
It is important to note that the iteration returns a `document` reference, and hence why the `&` is needed.
|
||||
|
||||
Unlike `parser.iterate`, `parser.iterate_many` may parse "on demand" (lazily). That is, no parsing may have been done before you enter the loop
|
||||
`for (auto & doc : docs) {` and you should expect the parser to only ever fully parse one JSON document at a time.
|
||||
|
||||
As with `parser.iterate`, when calling `parser.iterate_many(string)`, no copy is made of the provided string input. The provided memory buffer may be accessed each time a JSON document is parsed. Calling `parser.iterate_many(string)` on a temporary string buffer (e.g., `docs = parser.parse_many("[1,2,3]"_padded)`) is unsafe (and will not compile) because the `document_stream` instance needs access to the buffer to return the JSON documents.
|
||||
|
||||
|
||||
```c++
|
||||
dom::parser parser;
|
||||
auto json = R"({ "foo": 1 }
|
||||
{ "foo": 2 }
|
||||
{ "foo": 3 })"_padded;
|
||||
dom::document_stream docs = parser.parse_many(json);
|
||||
for (dom::element doc : docs) {
|
||||
cout << doc["foo"] << endl;
|
||||
}
|
||||
// Prints 1 2 3
|
||||
```
|
||||
`iterate_many` can also take an optional parameter `size_t batch_size` which defines the window processing size. It is set by default to a large value (`1000000` corresponding to 1 MB). None of your JSON documents should exceed this window size, or else you will get the error `simdjson::CAPACITY`. You cannot set this window size larger than 4 GB: you will get the error `simdjson::CAPACITY`. The smaller the window size is, the less memory the function will use. Setting the window size too small (e.g., less than 100 kB) may also impact performance negatively. Leaving it to 1 MB is expected to be a good choice, unless you have some larger documents.
|
||||
|
||||
If your documents are large (e.g., larger than a megabyte), then the `iterate_many` function is maybe ill-suited. It is really meant to support reading efficiently streams of relatively small documents (e.g., a few kilobytes each). If you have larger documents, you should use other functions like `iterate`.
|
||||
|
||||
Unlike `parser.parse`, both `parser.load_many(filename)` and `parser.parse_many(string)` may parse
|
||||
"on demand" (lazily). That is, no parsing may have been done before you enter the loop
|
||||
`for (dom::element doc : docs) {` and you should expect the parser to only ever fully parse one JSON
|
||||
document at a time.
|
||||
|
||||
1. When calling `parser.load_many(filename)`, the file's content is loaded up in a memory buffer owned by the `parser`'s instance. Thus the file can be safely deleted after calling `parser.load_many(filename)` as the parser instance owns all of the data.
|
||||
2. When calling `parser.parse_many(string)`, no copy is made of the provided string input. The provided memory buffer may be accessed each time a JSON document is parsed. Calling `parser.parse_many(string)` on a temporary string buffer (e.g., `docs = parser.parse_many("[1,2,3]"_padded)`) is unsafe (and will not compile) because the `document_stream` instance needs access to the buffer to return the JSON documents. In contrast, calling `doc = parser.parse("[1,2,3]"_padded)` is safe because `parser.parse` eagerly parses the input.
|
||||
|
||||
|
||||
Both `load_many` and `parse_many` take an optional parameter `size_t batch_size` which defines the window processing size. It is set by default to a large value (`1000000` corresponding to 1 MB). None of your JSON documents should exceed this window size, or else you will get the error `simdjson::CAPACITY`. You cannot set this window size larger than 4 GB: you will get the error `simdjson::CAPACITY`. The smaller the window size is, the less memory the function will use. Setting the window size too small (e.g., less than 100 kB) may also impact performance negatively. Leaving it to 1 MB is expected to be a good choice, unless you have some larger documents.
|
||||
|
||||
If your documents are large (e.g., larger than a megabyte), then the `load_many` and `parse_many` functions are maybe ill-suited. They are really meant to support reading efficiently streams of relatively small documents (e.g., a few kilobytes each). If you have larger documents, you should use other functions like `parse`.
|
||||
|
||||
See [parse_many.md](parse_many.md) for detailed information and design.
|
||||
See [iterate_many.md](iterate_many.md) for detailed information and design.
|
||||
|
||||
Thread Safety
|
||||
-------------
|
||||
|
||||
We built simdjson with thread safety in mind.
|
||||
|
||||
The simdjson library is single-threaded except for [`parse_many`](parse_many.md) which may use secondary threads under its control when the library is compiled with thread support.
|
||||
The simdjson library is single-threaded except for [`iterate_many`](iterate_many.md) and [`parse_many`](parse_many.md) which may use secondary threads under their control when the library is compiled with thread support.
|
||||
|
||||
|
||||
We recommend using one `dom::parser` object per thread in which case the library is thread-safe.
|
||||
|
|
|
@ -0,0 +1,229 @@
|
|||
iterate_many
|
||||
==========
|
||||
|
||||
An interface providing features to work with files or streams containing multiple small JSON documents.
|
||||
As fast and convenient as possible.
|
||||
|
||||
Contents
|
||||
--------
|
||||
|
||||
- [Motivations](#motivations)
|
||||
- [How it works](#how-it-works)
|
||||
- [Support](#support)
|
||||
- [API](#api)
|
||||
- [Use cases](#use-cases)
|
||||
- [Tracking your position](#tracking-your-position)
|
||||
- [Incomplete streams](#incomplete-streams)
|
||||
|
||||
Motivation
|
||||
-----------
|
||||
|
||||
The main motivation for this piece of software is to achieve maximum speed and offer a
|
||||
better quality of life in parsing files containing multiple small JSON documents.
|
||||
|
||||
The JavaScript Object Notation (JSON) [RFC7159](https://tools.ietf.org/html/rfc7159) is a handy
|
||||
serialization format. However, when serializing a large sequence of
|
||||
values as an array, or a possibly indeterminate-length or never-
|
||||
ending sequence of values, JSON may be inconvenient.
|
||||
|
||||
Consider a sequence of one million values, each possibly one kilobyte
|
||||
when encoded -- roughly one gigabyte. It is often desirable to process such a dataset incrementally
|
||||
without having to first read all of it before beginning to produce results.
|
||||
|
||||
|
||||
How it works
|
||||
------------
|
||||
|
||||
### Context
|
||||
|
||||
Before parsing anything, simdjson first preprocesses the JSON text by identifying all structural indexes
|
||||
(i.e. the starting position of any JSON value, as well as any important operators like `,`, `:`, `]` or
|
||||
`}`) and validating UTF8. This stage is referred to stage 1. However, during this process, simdjson has
|
||||
no knowledge of whether parsed a valid document, multiple documents, or even if the document is complete.
|
||||
Then, to iterate through the JSON text during parsing, we use what we call a JSON iterator that will navigate
|
||||
through the text using these structural indexes. This JSON iterator is not visible though, but it is the
|
||||
key component to make parsing work.
|
||||
|
||||
Prior to iterate_many, most people who had to parse a multiline JSON file would proceed by reading the
|
||||
file line by line, using a utility function like `std::getline` or equivalent, and would then use
|
||||
the `parse` on each of those lines. From a performance point of view, this process is highly
|
||||
inefficient, in that it requires a lot of unnecessary memory allocation and makes use of the
|
||||
`getline` function, which is fundamentally slow, slower than the act of parsing with simdjson
|
||||
[(more on this here)](https://lemire.me/blog/2019/06/18/how-fast-is-getline-in-c/).
|
||||
|
||||
Unlike the popular parser RapidJson, our DOM does not require the buffer once the parsing job is
|
||||
completed, the DOM and the buffer are completely independent. The drawback of this architecture is
|
||||
that we need to allocate some additional memory to store our ParsedJson data, for every document
|
||||
inside a given file. Memory allocation can be slow and become a bottleneck, therefore, we want to
|
||||
minimize it as much as possible.
|
||||
|
||||
### Design
|
||||
|
||||
To achieve a minimum amount of allocations, we opted for a design where we create only one
|
||||
parser object and therefore allocate its memory once, and then recycle it for every document in a
|
||||
given file. But, knowing that they often have largely varying size, we need to make sure that we
|
||||
allocate enough memory so that all the documents can fit. This value is what we call the batch size.
|
||||
As of right now, we need to manually specify a value for this batch size, it has to be at least as
|
||||
big as the biggest document in your file, but not too big so that it submerges the cached memory.
|
||||
The bigger the batch size, the fewer we need to make allocations. We found that 1MB is somewhat a
|
||||
sweet spot.
|
||||
|
||||
1. When the user calls `iterate_many`, we return a `document_stream` which the user can iterate over
|
||||
to receive parsed documents.
|
||||
2. We call stage 1 on the first batch_size bytes of JSON in the buffer, detecting structural
|
||||
indexes for all documents in that batch.
|
||||
3. The `document_stream` owns a `document` instance that keeps track of the current document position
|
||||
in the stream using a JSON iterator. To obtain a valid document, the `document_stream` returns a
|
||||
**reference** to its document instance.
|
||||
4. Each time the user calls `++` to read the next document, the JSON iterator moves to the start the
|
||||
next document.
|
||||
5. When we reach the end of the batch, we call stage 1 on the next batch, starting from the end of
|
||||
the last document, and go to step 3.
|
||||
|
||||
### Threads
|
||||
|
||||
But how can we make use of threads if they are available? We found a pretty cool algorithm that allows
|
||||
us to quickly identify the position of the last JSON document in a given batch. Knowing exactly where
|
||||
the end of the last document in the batch is, we can safely parse through the last document without any
|
||||
worries that it might be incomplete. Therefore, we can run stage 1 on the next batch concurrently while
|
||||
parsing the documents in the current batch. Running stage 1 in a different thread can, in best cases,
|
||||
remove almost entirely its cost and replaces it by the overhead of a thread, which is orders of magnitude
|
||||
cheaper. Ain't that awesome!
|
||||
|
||||
Thread support is only active if thread supported is detected in which case the macro
|
||||
SIMDJSON_THREADS_ENABLED is set. Otherwise the library runs in single-thread mode.
|
||||
|
||||
A `document_stream` instance uses at most two threads: there is a main thread and a worker thread.
|
||||
|
||||
Support
|
||||
-------
|
||||
|
||||
Since we want to offer flexibility and not restrict ourselves to a specific file
|
||||
format, we support any file that contains any amount of valid JSON document, **separated by one
|
||||
or more character that is considered whitespace** by the JSON spec. Anything that is
|
||||
not whitespace will be parsed as a JSON document and could lead to failure.
|
||||
|
||||
Whitespace Characters:
|
||||
- **Space**
|
||||
- **Linefeed**
|
||||
- **Carriage return**
|
||||
- **Horizontal tab**
|
||||
- **Nothing**
|
||||
|
||||
Some official formats **(non-exhaustive list)**:
|
||||
- [Newline-Delimited JSON (NDJSON)](http://ndjson.org/)
|
||||
- [JSON lines (JSONL)](http://jsonlines.org/)
|
||||
- [Record separator-delimited JSON (RFC 7464)](https://tools.ietf.org/html/rfc7464) <- Not supported by JsonStream!
|
||||
- [More on Wikipedia...](https://en.wikipedia.org/wiki/JSON_streaming)
|
||||
|
||||
API
|
||||
---
|
||||
|
||||
See [basics.md](basics.md#newline-delimited-json-ndjson-and-json-lines) for an overview of the API.
|
||||
|
||||
## Use cases
|
||||
|
||||
From [jsonlines.org](http://jsonlines.org/examples/):
|
||||
|
||||
- **Better than CSV**
|
||||
```json
|
||||
["Name", "Session", "Score", "Completed"]
|
||||
["Gilbert", "2013", 24, true]
|
||||
["Alexa", "2013", 29, true]
|
||||
["May", "2012B", 14, false]
|
||||
["Deloise", "2012A", 19, true]
|
||||
```
|
||||
CSV seems so easy that many programmers have written code to generate it themselves, and almost every implementation is
|
||||
different. Handling broken CSV files is a common and frustrating task. CSV has no standard encoding, no standard column
|
||||
separator and multiple character escaping standards. String is the only type supported for cell values, so some programs
|
||||
attempt to guess the correct types.
|
||||
|
||||
JSON Lines handles tabular data cleanly and without ambiguity. Cells may use the standard JSON types.
|
||||
|
||||
The biggest missing piece is an import/export filter for popular spreadsheet programs so that non-programmers can use
|
||||
this format.
|
||||
|
||||
- **Easy Nested Data**
|
||||
```json
|
||||
{"name": "Gilbert", "wins": [["straight", "7♣"], ["one pair", "10♥"]]}
|
||||
{"name": "Alexa", "wins": [["two pair", "4♠"], ["two pair", "9♠"]]}
|
||||
{"name": "May", "wins": []}
|
||||
{"name": "Deloise", "wins": [["three of a kind", "5♣"]]}
|
||||
```
|
||||
JSON Lines' biggest strength is in handling lots of similar nested data structures. One .jsonl file is easier to
|
||||
work with than a directory full of XML files.
|
||||
|
||||
|
||||
Tracking your position
|
||||
-----------
|
||||
|
||||
Some users would like to know where the document they parsed is in the input array of bytes.
|
||||
It is possible to do so by accessing directly the iterator and calling its `current_index()`
|
||||
method which reports the location (in bytes) of the current document in the input stream.
|
||||
You may also call the `source()` method to get a `std::string_view` instance on the document
|
||||
and `error()` to check if there were any error.
|
||||
|
||||
Let us illustrate the idea with code:
|
||||
|
||||
|
||||
```C++
|
||||
auto json = R"([1,2,3] {"1":1,"2":3,"4":4} [1,2,3] )"_padded;
|
||||
simdjson::ondemand::parser parser;
|
||||
simdjson::ondemand::document_stream stream;
|
||||
auto error = parser.iterate_many(json).get(stream);
|
||||
if( error ) { /* do something */ }
|
||||
auto i = stream.begin();
|
||||
size_t count{0};
|
||||
for(; i != stream.end(); ++i) {
|
||||
auto & doc = *i;
|
||||
if(!i.error()) {
|
||||
std::cout << "got full document at " << i.current_index() << std::endl;
|
||||
std::cout << i.source() << std::endl;
|
||||
count++;
|
||||
} else {
|
||||
std::cout << "got broken document at " << i.current_index() << std::endl;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
This code will print:
|
||||
```
|
||||
got full document at 0
|
||||
[1,2,3]
|
||||
got full document at 9
|
||||
{"1":1,"2":3,"4":4}
|
||||
got full document at 29
|
||||
[1,2,3]
|
||||
```
|
||||
|
||||
|
||||
Incomplete streams
|
||||
-----------
|
||||
|
||||
Some users may need to work with truncated streams. The simdjson may truncate documents at the very end of the stream that cannot possibly be valid JSON (e.g., they contain unclosed strings, unmatched brackets, unmatched braces). After iterating through the stream, you may query the `truncated_bytes()` method which tells you how many bytes were truncated. If the stream is made of full (whole) documents, then you should expect `truncated_bytes()` to return zero.
|
||||
|
||||
|
||||
Consider the following example where a truncated document (`{"key":"intentionally unclosed string `) containing 39 bytes has been left within the stream. In such cases, the first two whole documents are parsed and returned, and the `truncated_bytes()` method returns 39.
|
||||
|
||||
```C++
|
||||
auto json = R"([1,2,3] {"1":1,"2":3,"4":4} {"key":"intentionally unclosed string )"_padded;
|
||||
simdjson::ondemand::parser parser;
|
||||
simdjson::ondemand::document_stream stream;
|
||||
auto error = parser.iterate_many(json,json.size()).get(stream);
|
||||
if(error) { std::cerr << error << std::endl; return; }
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
std::cout << i.source() << std::endl;
|
||||
}
|
||||
std::cout << stream.truncated_bytes() << " bytes "<< std::endl; // returns 39 bytes
|
||||
```
|
||||
|
||||
This will print:
|
||||
```
|
||||
[1,2,3]
|
||||
{"1":1,"2":3,"4":4}
|
||||
39 bytes
|
||||
```
|
||||
|
||||
Importantly, you should only call `truncated_bytes()` after iterating through all of the documents since the stream cannot tell whether there are truncated documents at the very end when it may not have accessed that part of the data yet.
|
|
@ -180,6 +180,7 @@ Let us illustrate the idea with code:
|
|||
auto error = parser.parse_many(json).get(stream);
|
||||
if( error ) { /* do something */ }
|
||||
auto i = stream.begin();
|
||||
size_t count{0};
|
||||
for(; i != stream.end(); ++i) {
|
||||
auto doc = *i;
|
||||
if(!doc.error()) {
|
||||
|
|
|
@ -335,7 +335,7 @@ public:
|
|||
* - other json errors if parsing fails. You should not rely on these errors to always the same for the
|
||||
* same document: they may vary under runtime dispatch (so they may vary depending on your system and hardware).
|
||||
*/
|
||||
inline simdjson_result<document_stream> load_many(const std::string &path, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> load_many(const std::string &path, size_t batch_size = dom::DEFAULT_BATCH_SIZE) noexcept;
|
||||
|
||||
/**
|
||||
* Parse a buffer containing many JSON documents.
|
||||
|
@ -429,18 +429,18 @@ public:
|
|||
* - other json errors if parsing fails. You should not rely on these errors to always the same for the
|
||||
* same document: they may vary under runtime dispatch (so they may vary depending on your system and hardware).
|
||||
*/
|
||||
inline simdjson_result<document_stream> parse_many(const uint8_t *buf, size_t len, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> parse_many(const uint8_t *buf, size_t len, size_t batch_size = dom::DEFAULT_BATCH_SIZE) noexcept;
|
||||
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
|
||||
inline simdjson_result<document_stream> parse_many(const char *buf, size_t len, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> parse_many(const char *buf, size_t len, size_t batch_size = dom::DEFAULT_BATCH_SIZE) noexcept;
|
||||
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
|
||||
inline simdjson_result<document_stream> parse_many(const std::string &s, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> parse_many(const std::string &s, size_t batch_size = dom::DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> parse_many(const std::string &&s, size_t batch_size) = delete;// unsafe
|
||||
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
|
||||
inline simdjson_result<document_stream> parse_many(const padded_string &s, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> parse_many(const padded_string &s, size_t batch_size = dom::DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> parse_many(const padded_string &&s, size_t batch_size) = delete;// unsafe
|
||||
|
||||
/** @private We do not want to allow implicit conversion from C string to std::string. */
|
||||
simdjson_result<document_stream> parse_many(const char *buf, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept = delete;
|
||||
simdjson_result<document_stream> parse_many(const char *buf, size_t batch_size = dom::DEFAULT_BATCH_SIZE) noexcept = delete;
|
||||
|
||||
/**
|
||||
* Ensure this parser has enough memory to process JSON documents up to `capacity` bytes in length
|
||||
|
|
|
@ -12,4 +12,5 @@
|
|||
#include "simdjson/generic/ondemand/field-inl.h"
|
||||
#include "simdjson/generic/ondemand/object-inl.h"
|
||||
#include "simdjson/generic/ondemand/parser-inl.h"
|
||||
#include "simdjson/generic/ondemand/document_stream-inl.h"
|
||||
#include "simdjson/generic/ondemand/serialization-inl.h"
|
||||
|
|
|
@ -29,4 +29,5 @@ using depth_t = int32_t;
|
|||
#include "simdjson/generic/ondemand/field.h"
|
||||
#include "simdjson/generic/ondemand/object.h"
|
||||
#include "simdjson/generic/ondemand/parser.h"
|
||||
#include "simdjson/generic/ondemand/document_stream.h"
|
||||
#include "simdjson/generic/ondemand/serialization.h"
|
||||
|
|
|
@ -10,6 +10,7 @@ class object;
|
|||
class value;
|
||||
class raw_json_string;
|
||||
class array_iterator;
|
||||
class document_stream;
|
||||
|
||||
/**
|
||||
* A JSON document iteration.
|
||||
|
@ -384,6 +385,7 @@ protected:
|
|||
friend class array;
|
||||
friend class field;
|
||||
friend class token;
|
||||
friend class document_stream;
|
||||
};
|
||||
|
||||
} // namespace ondemand
|
||||
|
|
|
@ -0,0 +1,405 @@
|
|||
#include <algorithm>
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
namespace simdjson {
|
||||
namespace SIMDJSON_IMPLEMENTATION {
|
||||
namespace ondemand {
|
||||
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
|
||||
inline void stage1_worker::finish() {
|
||||
// After calling "run" someone would call finish() to wait
|
||||
// for the end of the processing.
|
||||
// This function will wait until either the thread has done
|
||||
// the processing or, else, the destructor has been called.
|
||||
std::unique_lock<std::mutex> lock(locking_mutex);
|
||||
cond_var.wait(lock, [this]{return has_work == false;});
|
||||
}
|
||||
|
||||
inline stage1_worker::~stage1_worker() {
|
||||
// The thread may never outlive the stage1_worker instance
|
||||
// and will always be stopped/joined before the stage1_worker
|
||||
// instance is gone.
|
||||
stop_thread();
|
||||
}
|
||||
|
||||
inline void stage1_worker::start_thread() {
|
||||
std::unique_lock<std::mutex> lock(locking_mutex);
|
||||
if(thread.joinable()) {
|
||||
return; // This should never happen but we never want to create more than one thread.
|
||||
}
|
||||
thread = std::thread([this]{
|
||||
while(true) {
|
||||
std::unique_lock<std::mutex> thread_lock(locking_mutex);
|
||||
// We wait for either "run" or "stop_thread" to be called.
|
||||
cond_var.wait(thread_lock, [this]{return has_work || !can_work;});
|
||||
// If, for some reason, the stop_thread() method was called (i.e., the
|
||||
// destructor of stage1_worker is called, then we want to immediately destroy
|
||||
// the thread (and not do any more processing).
|
||||
if(!can_work) {
|
||||
break;
|
||||
}
|
||||
this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
|
||||
this->_next_batch_start);
|
||||
this->has_work = false;
|
||||
// The condition variable call should be moved after thread_lock.unlock() for performance
|
||||
// reasons but thread sanitizers may report it as a data race if we do.
|
||||
// See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
|
||||
cond_var.notify_one(); // will notify "finish"
|
||||
thread_lock.unlock();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
inline void stage1_worker::stop_thread() {
|
||||
std::unique_lock<std::mutex> lock(locking_mutex);
|
||||
// We have to make sure that all locks can be released.
|
||||
can_work = false;
|
||||
has_work = false;
|
||||
cond_var.notify_all();
|
||||
lock.unlock();
|
||||
if(thread.joinable()) {
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
inline void stage1_worker::run(document_stream * ds, parser * stage1, size_t next_batch_start) {
|
||||
std::unique_lock<std::mutex> lock(locking_mutex);
|
||||
owner = ds;
|
||||
_next_batch_start = next_batch_start;
|
||||
stage1_thread_parser = stage1;
|
||||
has_work = true;
|
||||
// The condition variable call should be moved after thread_lock.unlock() for performance
|
||||
// reasons but thread sanitizers may report it as a data race if we do.
|
||||
// See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
|
||||
cond_var.notify_one(); // will notify the thread lock that we have work
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
#endif // SIMDJSON_THREADS_ENABLED
|
||||
|
||||
simdjson_really_inline document_stream::document_stream(
|
||||
ondemand::parser &_parser,
|
||||
const uint8_t *_buf,
|
||||
size_t _len,
|
||||
size_t _batch_size
|
||||
) noexcept
|
||||
: parser{&_parser},
|
||||
buf{_buf},
|
||||
len{_len},
|
||||
batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
|
||||
error{SUCCESS}
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
, use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
|
||||
#endif
|
||||
{
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
if(worker.get() == nullptr) {
|
||||
error = MEMALLOC;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
simdjson_really_inline document_stream::document_stream() noexcept
|
||||
: parser{nullptr},
|
||||
buf{nullptr},
|
||||
len{0},
|
||||
batch_size{0},
|
||||
error{UNINITIALIZED}
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
, use_thread(false)
|
||||
#endif
|
||||
{
|
||||
}
|
||||
|
||||
simdjson_really_inline document_stream::~document_stream() noexcept
|
||||
{
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
worker.reset();
|
||||
#endif
|
||||
}
|
||||
|
||||
inline size_t document_stream::size_in_bytes() const noexcept {
|
||||
return len;
|
||||
}
|
||||
|
||||
inline size_t document_stream::truncated_bytes() const noexcept {
|
||||
return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1];
|
||||
}
|
||||
|
||||
simdjson_really_inline document_stream::iterator::iterator() noexcept
|
||||
: stream{nullptr}, finished{true} {
|
||||
}
|
||||
|
||||
simdjson_really_inline document_stream::iterator::iterator(document_stream* _stream, bool is_end) noexcept
|
||||
: stream{_stream}, finished{is_end} {
|
||||
}
|
||||
|
||||
simdjson_really_inline ondemand::document& document_stream::iterator::operator*() noexcept {
|
||||
return stream->doc;
|
||||
}
|
||||
|
||||
simdjson_really_inline document_stream::iterator& document_stream::iterator::operator++() noexcept {
|
||||
// If there is an error, then we want the iterator
|
||||
// to be finished, no matter what. (E.g., we do not
|
||||
// keep generating documents with errors, or go beyond
|
||||
// a document with errors.)
|
||||
//
|
||||
// Users do not have to call "operator*()" when they use operator++,
|
||||
// so we need to end the stream in the operator++ function.
|
||||
//
|
||||
// Note that setting finished = true is essential otherwise
|
||||
// we would enter an infinite loop.
|
||||
if (stream->error) { finished = true; }
|
||||
// Note that stream->error() is guarded against error conditions
|
||||
// (it will immediately return if stream->error casts to false).
|
||||
// In effect, this next function does nothing when (stream->error)
|
||||
// is true (hence the risk of an infinite loop).
|
||||
stream->next();
|
||||
// If that was the last document, we're finished.
|
||||
// It is the only type of error we do not want to appear
|
||||
// in operator*.
|
||||
if (stream->error == EMPTY) { finished = true; }
|
||||
// If we had any other kind of error (not EMPTY) then we want
|
||||
// to pass it along to the operator* and we cannot mark the result
|
||||
// as "finished" just yet.
|
||||
return *this;
|
||||
}
|
||||
|
||||
simdjson_really_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept {
|
||||
return finished != other.finished;
|
||||
}
|
||||
|
||||
simdjson_really_inline document_stream::iterator document_stream::begin() noexcept {
|
||||
start();
|
||||
// If there are no documents, we're finished.
|
||||
return iterator(this, error == EMPTY);
|
||||
}
|
||||
|
||||
simdjson_really_inline document_stream::iterator document_stream::end() noexcept {
|
||||
return iterator(this, true);
|
||||
}
|
||||
|
||||
inline void document_stream::start() noexcept {
|
||||
if (error) { return; }
|
||||
error = parser->allocate(batch_size);
|
||||
if (error) { return; }
|
||||
// Always run the first stage 1 parse immediately
|
||||
batch_start = 0;
|
||||
error = run_stage1(*parser, batch_start);
|
||||
while(error == EMPTY) {
|
||||
// In exceptional cases, we may start with an empty block
|
||||
batch_start = next_batch_start();
|
||||
if (batch_start >= len) { return; }
|
||||
error = run_stage1(*parser, batch_start);
|
||||
}
|
||||
if (error) { return; }
|
||||
doc_index = batch_start;
|
||||
doc = document(json_iterator(&buf[batch_start], parser));
|
||||
doc.iter._streaming = true;
|
||||
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
if (use_thread && next_batch_start() < len) {
|
||||
// Kick off the first thread on next batch if needed
|
||||
error = stage1_thread_parser.allocate(batch_size);
|
||||
if (error) { return; }
|
||||
worker->start_thread();
|
||||
start_stage1_thread();
|
||||
if (error) { return; }
|
||||
}
|
||||
#endif // SIMDJSON_THREADS_ENABLED
|
||||
}
|
||||
|
||||
inline void document_stream::next() noexcept {
|
||||
// We always enter at once once in an error condition.
|
||||
if (error) { return; }
|
||||
next_document();
|
||||
if (error) { return; }
|
||||
auto cur_struct_index = doc.iter._root - parser->implementation->structural_indexes.get();
|
||||
doc_index = batch_start + parser->implementation->structural_indexes[cur_struct_index];
|
||||
|
||||
// Check if at end of structural indexes (i.e. at end of batch)
|
||||
if(cur_struct_index >= static_cast<int64_t>(parser->implementation->n_structural_indexes)) {
|
||||
error = EMPTY;
|
||||
// Load another batch (if available)
|
||||
while (error == EMPTY) {
|
||||
batch_start = next_batch_start();
|
||||
if (batch_start >= len) { break; }
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
if(use_thread) {
|
||||
load_from_stage1_thread();
|
||||
} else {
|
||||
error = run_stage1(*parser, batch_start);
|
||||
}
|
||||
#else
|
||||
error = run_stage1(*parser, batch_start);
|
||||
#endif
|
||||
/**
|
||||
* Whenever we move to another window, we need to update all pointers to make
|
||||
* it appear as if the input buffer started at the beginning of the window.
|
||||
*
|
||||
* Take this input:
|
||||
*
|
||||
* {"z":5} {"1":1,"2":2,"4":4} [7, 10, 9] [15, 11, 12, 13] [154, 110, 112, 1311]
|
||||
*
|
||||
* Say you process the following window...
|
||||
*
|
||||
* '{"z":5} {"1":1,"2":2,"4":4} [7, 10, 9]'
|
||||
*
|
||||
* When you do so, the json_iterator has a pointer at the beginning of the memory region
|
||||
* (pointing at the beginning of '{"z"...'.
|
||||
*
|
||||
* When you move to the window that starts at...
|
||||
*
|
||||
* '[7, 10, 9] [15, 11, 12, 13] ...
|
||||
*
|
||||
* then it is not sufficient to just run stage 1. You also need to re-anchor the
|
||||
* json_iterator so that it believes we are starting at '[7, 10, 9]...'.
|
||||
*
|
||||
* Under the DOM front-end, this gets done automatically because the parser owns
|
||||
* the pointer the data, and when you call stage1 and then stage2 on the same
|
||||
* parser, then stage2 will run on the pointer acquired by stage1.
|
||||
*
|
||||
* That is, stage1 calls "this->buf = _buf" so the parser remembers the buffer that
|
||||
* we used. But json_iterator has no callback when stage1 is called on the parser.
|
||||
* In fact, I think that the parser is unaware of json_iterator.
|
||||
*
|
||||
*
|
||||
* So we need to re-anchor the json_iterator after each call to stage 1 so that
|
||||
* all of the pointers are in sync.
|
||||
*/
|
||||
doc.iter = json_iterator(&buf[batch_start], parser);
|
||||
doc.iter._streaming = true;
|
||||
/**
|
||||
* End of resync.
|
||||
*/
|
||||
|
||||
if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
|
||||
doc_index = batch_start;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inline void document_stream::next_document() noexcept {
|
||||
// Go to next place where depth=0 (document depth)
|
||||
error = doc.iter.skip_child(0);
|
||||
if (error) { return; }
|
||||
// Always set depth=1 at the start of document
|
||||
doc.iter._depth = 1;
|
||||
// Resets the string buffer at the beginning, thus invalidating the strings.
|
||||
doc.iter._string_buf_loc = parser->string_buf.get();
|
||||
doc.iter._root = doc.iter.position();
|
||||
}
|
||||
|
||||
inline size_t document_stream::next_batch_start() const noexcept {
|
||||
return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
|
||||
}
|
||||
|
||||
inline error_code document_stream::run_stage1(ondemand::parser &p, size_t _batch_start) noexcept {
|
||||
// This code only updates the structural index in the parser, it does not update any json_iterator
|
||||
// instance.
|
||||
size_t remaining = len - _batch_start;
|
||||
if (remaining <= batch_size) {
|
||||
return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
|
||||
} else {
|
||||
return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
|
||||
}
|
||||
}
|
||||
|
||||
simdjson_really_inline size_t document_stream::iterator::current_index() const noexcept {
|
||||
return stream->doc_index;
|
||||
}
|
||||
|
||||
simdjson_really_inline std::string_view document_stream::iterator::source() const noexcept {
|
||||
auto depth = stream->doc.iter.depth();
|
||||
auto cur_struct_index = stream->doc.iter._root - stream->parser->implementation->structural_indexes.get();
|
||||
|
||||
// If at root, process the first token to determine if scalar value
|
||||
if (stream->doc.iter.at_root()) {
|
||||
switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
|
||||
case '{': case '[': // Depth=1 already at start of document
|
||||
break;
|
||||
case '}': case ']':
|
||||
depth--;
|
||||
break;
|
||||
default: // Scalar value document
|
||||
// TODO: Remove any trailing whitespaces
|
||||
// This returns a string spanning from start of value to the beginning of the next document (excluded)
|
||||
return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[++cur_struct_index] - current_index() - 1);
|
||||
}
|
||||
cur_struct_index++;
|
||||
}
|
||||
|
||||
while (cur_struct_index <= static_cast<int64_t>(stream->parser->implementation->n_structural_indexes)) {
|
||||
switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
|
||||
case '{': case '[':
|
||||
depth++;
|
||||
break;
|
||||
case '}': case ']':
|
||||
depth--;
|
||||
break;
|
||||
}
|
||||
if (depth == 0) { break; }
|
||||
cur_struct_index++;
|
||||
}
|
||||
|
||||
return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[cur_struct_index] - current_index() + stream->batch_start + 1);;
|
||||
}
|
||||
|
||||
inline error_code document_stream::iterator::error() const noexcept {
|
||||
return stream->error;
|
||||
}
|
||||
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
|
||||
inline void document_stream::load_from_stage1_thread() noexcept {
|
||||
worker->finish();
|
||||
// Swap to the parser that was loaded up in the thread. Make sure the parser has
|
||||
// enough memory to swap to, as well.
|
||||
std::swap(stage1_thread_parser,*parser);
|
||||
error = stage1_thread_error;
|
||||
if (error) { return; }
|
||||
|
||||
// If there's anything left, start the stage 1 thread!
|
||||
if (next_batch_start() < len) {
|
||||
start_stage1_thread();
|
||||
}
|
||||
}
|
||||
|
||||
inline void document_stream::start_stage1_thread() noexcept {
|
||||
// we call the thread on a lambda that will update
|
||||
// this->stage1_thread_error
|
||||
// there is only one thread that may write to this value
|
||||
// TODO this is NOT exception-safe.
|
||||
this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
|
||||
size_t _next_batch_start = this->next_batch_start();
|
||||
|
||||
worker->run(this, & this->stage1_thread_parser, _next_batch_start);
|
||||
}
|
||||
|
||||
#endif // SIMDJSON_THREADS_ENABLED
|
||||
|
||||
} // namespace ondemand
|
||||
} // namespace SIMDJSON_IMPLEMENTATION
|
||||
} // namespace simdjson
|
||||
|
||||
namespace simdjson {
|
||||
|
||||
simdjson_really_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
|
||||
error_code error
|
||||
) noexcept :
|
||||
implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(error)
|
||||
{
|
||||
}
|
||||
simdjson_really_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
|
||||
SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value
|
||||
) noexcept :
|
||||
implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(
|
||||
std::forward<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(value)
|
||||
)
|
||||
{
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,330 @@
|
|||
#include "simdjson/error.h"
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#endif
|
||||
|
||||
namespace simdjson {
|
||||
namespace SIMDJSON_IMPLEMENTATION {
|
||||
namespace ondemand {
|
||||
|
||||
class parser;
|
||||
class json_iterator;
|
||||
class document;
|
||||
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
/** @private Custom worker class **/
|
||||
struct stage1_worker {
|
||||
stage1_worker() noexcept = default;
|
||||
stage1_worker(const stage1_worker&) = delete;
|
||||
stage1_worker(stage1_worker&&) = delete;
|
||||
stage1_worker operator=(const stage1_worker&) = delete;
|
||||
~stage1_worker();
|
||||
/**
|
||||
* We only start the thread when it is needed, not at object construction, this may throw.
|
||||
* You should only call this once.
|
||||
**/
|
||||
void start_thread();
|
||||
/**
|
||||
* Start a stage 1 job. You should first call 'run', then 'finish'.
|
||||
* You must call start_thread once before.
|
||||
*/
|
||||
void run(document_stream * ds, parser * stage1, size_t next_batch_start);
|
||||
/** Wait for the run to finish (blocking). You should first call 'run', then 'finish'. **/
|
||||
void finish();
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* Normally, we would never stop the thread. But we do in the destructor.
|
||||
* This function is only safe assuming that you are not waiting for results. You
|
||||
* should have called run, then finish, and be done.
|
||||
**/
|
||||
void stop_thread();
|
||||
|
||||
std::thread thread{};
|
||||
/** These three variables define the work done by the thread. **/
|
||||
ondemand::parser * stage1_thread_parser{};
|
||||
size_t _next_batch_start{};
|
||||
document_stream * owner{};
|
||||
/**
|
||||
* We have two state variables. This could be streamlined to one variable in the future but
|
||||
* we use two for clarity.
|
||||
*/
|
||||
bool has_work{false};
|
||||
bool can_work{true};
|
||||
|
||||
/**
|
||||
* We lock using a mutex.
|
||||
*/
|
||||
std::mutex locking_mutex{};
|
||||
std::condition_variable cond_var{};
|
||||
|
||||
friend class document_stream;
|
||||
};
|
||||
#endif // SIMDJSON_THREADS_ENABLED
|
||||
|
||||
/**
|
||||
* A forward-only stream of documents.
|
||||
*
|
||||
* Produced by parser::iterate_many.
|
||||
*
|
||||
*/
|
||||
class document_stream {
|
||||
public:
|
||||
/**
|
||||
* Construct an uninitialized document_stream.
|
||||
*
|
||||
* ```c++
|
||||
* document_stream docs;
|
||||
* auto error = parser.iterate_many(json).get(docs);
|
||||
* ```
|
||||
*/
|
||||
simdjson_really_inline document_stream() noexcept;
|
||||
/** Move one document_stream to another. */
|
||||
simdjson_really_inline document_stream(document_stream &&other) noexcept = default;
|
||||
/** Move one document_stream to another. */
|
||||
simdjson_really_inline document_stream &operator=(document_stream &&other) noexcept = default;
|
||||
|
||||
simdjson_really_inline ~document_stream() noexcept;
|
||||
|
||||
/**
|
||||
* Returns the input size in bytes.
|
||||
*/
|
||||
inline size_t size_in_bytes() const noexcept;
|
||||
|
||||
/**
|
||||
* After iterating through the stream, this method
|
||||
* returns the number of bytes that were not parsed at the end
|
||||
* of the stream. If truncated_bytes() differs from zero,
|
||||
* then the input was truncated maybe because incomplete JSON
|
||||
* documents were found at the end of the stream. You
|
||||
* may need to process the bytes in the interval [size_in_bytes()-truncated_bytes(), size_in_bytes()).
|
||||
*
|
||||
* You should only call truncated_bytes() after streaming through all
|
||||
* documents, like so:
|
||||
*
|
||||
* document_stream stream = parser.iterate_many(json,window);
|
||||
* for(auto & doc : stream) {
|
||||
* // do something with doc
|
||||
* }
|
||||
* size_t truncated = stream.truncated_bytes();
|
||||
*
|
||||
*/
|
||||
inline size_t truncated_bytes() const noexcept;
|
||||
|
||||
class iterator {
|
||||
public:
|
||||
using value_type = simdjson_result<document>;
|
||||
using reference = value_type;
|
||||
|
||||
using difference_type = std::ptrdiff_t;
|
||||
|
||||
using iterator_category = std::input_iterator_tag;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
simdjson_really_inline iterator() noexcept;
|
||||
/**
|
||||
* Get the current document (or error).
|
||||
*/
|
||||
simdjson_really_inline ondemand::document& operator*() noexcept;
|
||||
/**
|
||||
* Advance to the next document (prefix).
|
||||
*/
|
||||
inline iterator& operator++() noexcept;
|
||||
/**
|
||||
* Check if we're at the end yet.
|
||||
* @param other the end iterator to compare to.
|
||||
*/
|
||||
simdjson_really_inline bool operator!=(const iterator &other) const noexcept;
|
||||
/**
|
||||
* @private
|
||||
*
|
||||
* Gives the current index in the input document in bytes.
|
||||
*
|
||||
* document_stream stream = parser.parse_many(json,window);
|
||||
* for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
* auto doc = *i;
|
||||
* size_t index = i.current_index();
|
||||
* }
|
||||
*
|
||||
* This function (current_index()) is experimental and the usage
|
||||
* may change in future versions of simdjson: we find the API somewhat
|
||||
* awkward and we would like to offer something friendlier.
|
||||
*/
|
||||
simdjson_really_inline size_t current_index() const noexcept;
|
||||
|
||||
/**
|
||||
* @private
|
||||
*
|
||||
* Gives a view of the current document at the current position.
|
||||
*
|
||||
* document_stream stream = parser.iterate_many(json,window);
|
||||
* for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
* std::string_view v = i.source();
|
||||
* }
|
||||
*
|
||||
* The returned string_view instance is simply a map to the (unparsed)
|
||||
* source string: it may thus include white-space characters and all manner
|
||||
* of padding.
|
||||
*
|
||||
* This function (source()) is experimental and the usage
|
||||
* may change in future versions of simdjson: we find the API somewhat
|
||||
* awkward and we would like to offer something friendlier.
|
||||
*
|
||||
*/
|
||||
simdjson_really_inline std::string_view source() const noexcept;
|
||||
|
||||
/**
|
||||
* Returns error of the stream (if any).
|
||||
*/
|
||||
inline error_code error() const noexcept;
|
||||
|
||||
private:
|
||||
simdjson_really_inline iterator(document_stream *s, bool finished) noexcept;
|
||||
/** The document_stream we're iterating through. */
|
||||
document_stream* stream;
|
||||
/** Whether we're finished or not. */
|
||||
bool finished;
|
||||
|
||||
friend class document;
|
||||
friend class document_stream;
|
||||
friend class json_iterator;
|
||||
};
|
||||
|
||||
/**
|
||||
* Start iterating the documents in the stream.
|
||||
*/
|
||||
simdjson_really_inline iterator begin() noexcept;
|
||||
/**
|
||||
* The end of the stream, for iterator comparison purposes.
|
||||
*/
|
||||
simdjson_really_inline iterator end() noexcept;
|
||||
|
||||
private:
|
||||
|
||||
document_stream &operator=(const document_stream &) = delete; // Disallow copying
|
||||
document_stream(const document_stream &other) = delete; // Disallow copying
|
||||
|
||||
/**
|
||||
* Construct a document_stream. Does not allocate or parse anything until the iterator is
|
||||
* used.
|
||||
*
|
||||
* @param parser is a reference to the parser instance used to generate this document_stream
|
||||
* @param buf is the raw byte buffer we need to process
|
||||
* @param len is the length of the raw byte buffer in bytes
|
||||
* @param batch_size is the size of the windows (must be strictly greater or equal to the largest JSON document)
|
||||
*/
|
||||
simdjson_really_inline document_stream(
|
||||
ondemand::parser &parser,
|
||||
const uint8_t *buf,
|
||||
size_t len,
|
||||
size_t batch_size
|
||||
) noexcept;
|
||||
|
||||
/**
|
||||
* Parse the first document in the buffer. Used by begin(), to handle allocation and
|
||||
* initialization.
|
||||
*/
|
||||
inline void start() noexcept;
|
||||
|
||||
/**
|
||||
* Parse the next document found in the buffer previously given to document_stream.
|
||||
*
|
||||
* The content should be a valid JSON document encoded as UTF-8. If there is a
|
||||
* UTF-8 BOM, the caller is responsible for omitting it, UTF-8 BOM are
|
||||
* discouraged.
|
||||
*
|
||||
* You do NOT need to pre-allocate a parser. This function takes care of
|
||||
* pre-allocating a capacity defined by the batch_size defined when creating the
|
||||
* document_stream object.
|
||||
*
|
||||
* The function returns simdjson::EMPTY if there is no more data to be parsed.
|
||||
*
|
||||
* The function returns simdjson::SUCCESS (as integer = 0) in case of success
|
||||
* and indicates that the buffer has successfully been parsed to the end.
|
||||
* Every document it contained has been parsed without error.
|
||||
*
|
||||
* The function returns an error code from simdjson/simdjson.h in case of failure
|
||||
* such as simdjson::CAPACITY, simdjson::MEMALLOC, simdjson::DEPTH_ERROR and so forth;
|
||||
* the simdjson::error_message function converts these error codes into a string).
|
||||
*
|
||||
* You can also check validity by calling parser.is_valid(). The same parser can
|
||||
* and should be reused for the other documents in the buffer.
|
||||
*/
|
||||
inline void next() noexcept;
|
||||
|
||||
/** Move the json_iterator of the document to the location of the next document in the stream. */
|
||||
inline void next_document() noexcept;
|
||||
|
||||
/** Get the next document index. */
|
||||
inline size_t next_batch_start() const noexcept;
|
||||
|
||||
/** Pass the next batch through stage 1 with the given parser. */
|
||||
inline error_code run_stage1(ondemand::parser &p, size_t batch_start) noexcept;
|
||||
|
||||
// Fields
|
||||
ondemand::parser *parser;
|
||||
const uint8_t *buf;
|
||||
size_t len;
|
||||
size_t batch_size;
|
||||
/**
|
||||
* We are going to use just one document instance. The document owns
|
||||
* the json_iterator. It implies that we only ever pass a reference
|
||||
* to the document to the users.
|
||||
*/
|
||||
document doc{};
|
||||
/** The error (or lack thereof) from the current document. */
|
||||
error_code error;
|
||||
size_t batch_start{0};
|
||||
size_t doc_index{};
|
||||
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
/** Indicates whether we use threads. Note that this needs to be a constant during the execution of the parsing. */
|
||||
bool use_thread;
|
||||
|
||||
inline void load_from_stage1_thread() noexcept;
|
||||
|
||||
/** Start a thread to run stage 1 on the next batch. */
|
||||
inline void start_stage1_thread() noexcept;
|
||||
|
||||
/** Wait for the stage 1 thread to finish and capture the results. */
|
||||
inline void finish_stage1_thread() noexcept;
|
||||
|
||||
/** The error returned from the stage 1 thread. */
|
||||
error_code stage1_thread_error{UNINITIALIZED};
|
||||
/** The thread used to run stage 1 against the next batch in the background. */
|
||||
std::unique_ptr<stage1_worker> worker{new(std::nothrow) stage1_worker()};
|
||||
/**
|
||||
* The parser used to run stage 1 in the background. Will be swapped
|
||||
* with the regular parser when finished.
|
||||
*/
|
||||
ondemand::parser stage1_thread_parser{};
|
||||
|
||||
friend struct stage1_worker;
|
||||
#endif // SIMDJSON_THREADS_ENABLED
|
||||
|
||||
friend class parser;
|
||||
friend class document;
|
||||
friend class json_iterator;
|
||||
friend struct simdjson_result<ondemand::document_stream>;
|
||||
friend struct internal::simdjson_result_base<ondemand::document_stream>;
|
||||
}; // document_stream
|
||||
|
||||
} // namespace ondemand
|
||||
} // namespace SIMDJSON_IMPLEMENTATION
|
||||
} // namespace simdjson
|
||||
|
||||
namespace simdjson {
|
||||
template<>
|
||||
struct simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream> : public SIMDJSON_IMPLEMENTATION::implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream> {
|
||||
public:
|
||||
simdjson_really_inline simdjson_result(SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value) noexcept; ///< @private
|
||||
simdjson_really_inline simdjson_result(error_code error) noexcept; ///< @private
|
||||
simdjson_really_inline simdjson_result() noexcept = default;
|
||||
};
|
||||
|
||||
} // namespace simdjson
|
|
@ -7,7 +7,9 @@ simdjson_really_inline json_iterator::json_iterator(json_iterator &&other) noexc
|
|||
parser{other.parser},
|
||||
_string_buf_loc{other._string_buf_loc},
|
||||
error{other.error},
|
||||
_depth{other._depth}
|
||||
_depth{other._depth},
|
||||
_root{other._root},
|
||||
_streaming{other._streaming}
|
||||
{
|
||||
other.parser = nullptr;
|
||||
}
|
||||
|
@ -17,6 +19,8 @@ simdjson_really_inline json_iterator &json_iterator::operator=(json_iterator &&o
|
|||
_string_buf_loc = other._string_buf_loc;
|
||||
error = other.error;
|
||||
_depth = other._depth;
|
||||
_root = other._root;
|
||||
_streaming = other._streaming;
|
||||
other.parser = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
@ -25,13 +29,16 @@ simdjson_really_inline json_iterator::json_iterator(const uint8_t *buf, ondemand
|
|||
: token(buf, _parser->implementation->structural_indexes.get()),
|
||||
parser{_parser},
|
||||
_string_buf_loc{parser->string_buf.get()},
|
||||
_depth{1}
|
||||
_depth{1},
|
||||
_root{parser->implementation->structural_indexes.get()},
|
||||
_streaming{false}
|
||||
|
||||
{
|
||||
logger::log_headers();
|
||||
}
|
||||
|
||||
inline void json_iterator::rewind() noexcept {
|
||||
token.index = parser->implementation->structural_indexes.get();
|
||||
token.index = _root;
|
||||
logger::log_headers(); // We start again
|
||||
_string_buf_loc = parser->string_buf.get();
|
||||
_depth = 1;
|
||||
|
@ -132,15 +139,19 @@ simdjson_really_inline bool json_iterator::at_root() const noexcept {
|
|||
return token.position() == root_checkpoint();
|
||||
}
|
||||
|
||||
simdjson_really_inline bool json_iterator::streaming() const noexcept {
|
||||
return _streaming;
|
||||
}
|
||||
|
||||
simdjson_really_inline token_position json_iterator::root_checkpoint() const noexcept {
|
||||
return parser->implementation->structural_indexes.get();
|
||||
return _root;
|
||||
}
|
||||
|
||||
simdjson_really_inline void json_iterator::assert_at_root() const noexcept {
|
||||
SIMDJSON_ASSUME( _depth == 1 );
|
||||
// Visual Studio Clang treats unique_ptr.get() as "side effecting."
|
||||
#ifndef SIMDJSON_CLANG_VISUAL_STUDIO
|
||||
SIMDJSON_ASSUME( token.index == parser->implementation->structural_indexes.get() );
|
||||
SIMDJSON_ASSUME( token.index == _root );
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ namespace SIMDJSON_IMPLEMENTATION {
|
|||
namespace ondemand {
|
||||
|
||||
class document;
|
||||
class document_stream;
|
||||
class object;
|
||||
class array;
|
||||
class value;
|
||||
|
@ -44,6 +45,20 @@ protected:
|
|||
* - 3 = key or value inside root array/object.
|
||||
*/
|
||||
depth_t _depth{};
|
||||
/**
|
||||
* Beginning of the document indexes.
|
||||
* Normally we have root == parser->implementation->structural_indexes.get()
|
||||
* but this may differ, especially in streaming mode (where we have several
|
||||
* documents);
|
||||
*/
|
||||
token_position _root{};
|
||||
/**
|
||||
* Normally, a json_iterator operates over a single document, but in
|
||||
* some cases, we may have a stream of documents. This attribute is meant
|
||||
* as meta-data: the json_iterator works the same irrespective of the
|
||||
* value of this attribute.
|
||||
*/
|
||||
bool _streaming{false};
|
||||
|
||||
public:
|
||||
simdjson_really_inline json_iterator() noexcept = default;
|
||||
|
@ -61,6 +76,14 @@ public:
|
|||
*/
|
||||
simdjson_really_inline bool at_root() const noexcept;
|
||||
|
||||
/**
|
||||
* Tell whether we should be expected to run in streaming
|
||||
* mode (iterating over many documents). It is pure metadata
|
||||
* that does not affect how the iterator works. It is used by
|
||||
* start_root_array() and start_root_object().
|
||||
*/
|
||||
simdjson_really_inline bool streaming() const noexcept;
|
||||
|
||||
/**
|
||||
* Get the root value iterator
|
||||
*/
|
||||
|
@ -155,8 +178,8 @@ public:
|
|||
*
|
||||
* @param child_depth the expected child depth.
|
||||
*/
|
||||
simdjson_really_inline void descend_to(depth_t parent_depth) noexcept;
|
||||
simdjson_really_inline void descend_to(depth_t parent_depth, int32_t delta) noexcept;
|
||||
simdjson_really_inline void descend_to(depth_t child_depth) noexcept;
|
||||
simdjson_really_inline void descend_to(depth_t child_depth, int32_t delta) noexcept;
|
||||
|
||||
/**
|
||||
* Get current depth.
|
||||
|
@ -205,6 +228,7 @@ protected:
|
|||
simdjson_really_inline token_position last_document_position() const noexcept;
|
||||
|
||||
friend class document;
|
||||
friend class document_stream;
|
||||
friend class object;
|
||||
friend class array;
|
||||
friend class value;
|
||||
|
|
|
@ -84,6 +84,20 @@ simdjson_warn_unused simdjson_really_inline simdjson_result<json_iterator> parse
|
|||
return json_iterator(reinterpret_cast<const uint8_t *>(json.data()), this);
|
||||
}
|
||||
|
||||
inline simdjson_result<document_stream> parser::iterate_many(const uint8_t *buf, size_t len, size_t batch_size) noexcept {
|
||||
if(batch_size < MINIMAL_BATCH_SIZE) { batch_size = MINIMAL_BATCH_SIZE; }
|
||||
return document_stream(*this, buf, len, batch_size);
|
||||
}
|
||||
inline simdjson_result<document_stream> parser::iterate_many(const char *buf, size_t len, size_t batch_size) noexcept {
|
||||
return iterate_many(reinterpret_cast<const uint8_t *>(buf), len, batch_size);
|
||||
}
|
||||
inline simdjson_result<document_stream> parser::iterate_many(const std::string &s, size_t batch_size) noexcept {
|
||||
return iterate_many(s.data(), s.length(), batch_size);
|
||||
}
|
||||
inline simdjson_result<document_stream> parser::iterate_many(const padded_string &s, size_t batch_size) noexcept {
|
||||
return iterate_many(s.data(), s.length(), batch_size);
|
||||
}
|
||||
|
||||
simdjson_really_inline size_t parser::capacity() const noexcept {
|
||||
return _capacity;
|
||||
}
|
||||
|
|
|
@ -8,6 +8,23 @@ class array;
|
|||
class object;
|
||||
class value;
|
||||
class raw_json_string;
|
||||
class document_stream;
|
||||
|
||||
/**
|
||||
* The default batch size for document_stream instances for this On Demand kernel.
|
||||
* Note that different On Demand kernel may use a different DEFAULT_BATCH_SIZE value
|
||||
* in the future.
|
||||
*/
|
||||
static constexpr size_t DEFAULT_BATCH_SIZE = 1000000;
|
||||
/**
|
||||
* Some adversary might try to set the batch size to 0 or 1, which might cause problems.
|
||||
* We set a minimum of 32B since anything else is highly likely to be an error. In practice,
|
||||
* most users will want a much larger batch size.
|
||||
*
|
||||
* All non-negative MINIMAL_BATCH_SIZE values should be 'safe' except that, obviously, no JSON
|
||||
* document can ever span 0 or 1 byte and that very large values would create memory allocation issues.
|
||||
*/
|
||||
static constexpr size_t MINIMAL_BATCH_SIZE = 32;
|
||||
|
||||
/**
|
||||
* A JSON fragment iterator.
|
||||
|
@ -26,6 +43,7 @@ public:
|
|||
inline parser(parser &&other) noexcept = default;
|
||||
simdjson_really_inline parser(const parser &other) = delete;
|
||||
simdjson_really_inline parser &operator=(const parser &other) = delete;
|
||||
simdjson_really_inline parser &operator=(parser &&other) noexcept = default;
|
||||
|
||||
/** Deallocate the JSON parser. */
|
||||
inline ~parser() noexcept = default;
|
||||
|
@ -125,6 +143,19 @@ public:
|
|||
*/
|
||||
simdjson_warn_unused simdjson_result<json_iterator> iterate_raw(padded_string_view json) & noexcept;
|
||||
|
||||
inline simdjson_result<document_stream> iterate_many(const uint8_t *buf, size_t len, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
|
||||
inline simdjson_result<document_stream> iterate_many(const char *buf, size_t len, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
|
||||
inline simdjson_result<document_stream> iterate_many(const std::string &s, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> iterate_many(const std::string &&s, size_t batch_size) = delete;// unsafe
|
||||
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
|
||||
inline simdjson_result<document_stream> iterate_many(const padded_string &s, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
|
||||
inline simdjson_result<document_stream> iterate_many(const padded_string &&s, size_t batch_size) = delete;// unsafe
|
||||
|
||||
/** @private We do not want to allow implicit conversion from C string to std::string. */
|
||||
simdjson_result<document_stream> iterate_many(const char *buf, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept = delete;
|
||||
|
||||
/** The capacity of this parser (the largest document it can process). */
|
||||
simdjson_really_inline size_t capacity() const noexcept;
|
||||
/** The maximum capacity of this parser (the largest document it is allowed to process). */
|
||||
|
@ -143,6 +174,15 @@ public:
|
|||
*/
|
||||
simdjson_warn_unused error_code allocate(size_t capacity, size_t max_depth=DEFAULT_MAX_DEPTH) noexcept;
|
||||
|
||||
#ifdef SIMDJSON_THREADS_ENABLED
|
||||
/**
|
||||
* The parser instance can use threads when they are available to speed up some
|
||||
* operations. It is enabled by default. Changing this attribute will change the
|
||||
* behavior of the parser for future operations.
|
||||
*/
|
||||
bool threaded{true};
|
||||
#endif
|
||||
|
||||
private:
|
||||
/** @private [for benchmarking access] The implementation to use */
|
||||
std::unique_ptr<internal::dom_parser_implementation> implementation{};
|
||||
|
@ -155,6 +195,7 @@ private:
|
|||
#endif
|
||||
|
||||
friend class json_iterator;
|
||||
friend class document_stream;
|
||||
};
|
||||
|
||||
} // namespace ondemand
|
||||
|
|
|
@ -19,7 +19,10 @@ simdjson_warn_unused simdjson_really_inline simdjson_result<bool> value_iterator
|
|||
simdjson_warn_unused simdjson_really_inline simdjson_result<bool> value_iterator::start_root_object() noexcept {
|
||||
bool result;
|
||||
SIMDJSON_TRY( start_object().get(result) );
|
||||
if (*_json_iter->peek_last() != '}') { return _json_iter->report_error(TAPE_ERROR, "object invalid: { at beginning of document unmatched by } at end of document"); }
|
||||
if( ! _json_iter->streaming() ) {
|
||||
// For document streams, we do not know the "last" structural of the current document, so peek_last() is nonesense.
|
||||
if (*_json_iter->peek_last() != '}') { return _json_iter->report_error(TAPE_ERROR, "object invalid: { at beginning of document unmatched by } at end of document"); }
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -363,7 +366,10 @@ simdjson_warn_unused simdjson_really_inline simdjson_result<bool> value_iterator
|
|||
simdjson_warn_unused simdjson_really_inline simdjson_result<bool> value_iterator::start_root_array() noexcept {
|
||||
bool result;
|
||||
SIMDJSON_TRY( start_array().get(result) );
|
||||
if (*_json_iter->peek_last() != ']') { return _json_iter->report_error(TAPE_ERROR, "array invalid: [ at beginning of document unmatched by ] at end of document"); }
|
||||
if( ! _json_iter->streaming() ) {
|
||||
// For document streams, we do not know the "last" structural of the current document, so peek_last() is nonesense.
|
||||
if (*_json_iter->peek_last() != ']') { return _json_iter->report_error(TAPE_ERROR, "array invalid: [ at beginning of document unmatched by ] at end of document"); }
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ add_cpp_test(ondemand_active_tests LABELS ondemand acceptance per_impl
|
|||
add_cpp_test(ondemand_array_tests LABELS ondemand acceptance per_implementation)
|
||||
add_cpp_test(ondemand_array_error_tests LABELS ondemand acceptance per_implementation)
|
||||
add_cpp_test(ondemand_compilation_tests LABELS ondemand acceptance per_implementation)
|
||||
add_cpp_test(ondemand_document_stream_tests LABELS ondemand acceptance per_implementation)
|
||||
add_cpp_test(ondemand_error_tests LABELS ondemand acceptance per_implementation)
|
||||
add_cpp_test(ondemand_json_pointer_tests LABELS ondemand acceptance per_implementation)
|
||||
add_cpp_test(ondemand_key_string_tests LABELS ondemand acceptance per_implementation)
|
||||
|
|
|
@ -0,0 +1,453 @@
|
|||
#include "simdjson.h"
|
||||
#include "test_ondemand.h"
|
||||
|
||||
using namespace simdjson;
|
||||
|
||||
namespace document_stream_tests {
|
||||
|
||||
std::string my_string(ondemand::document& doc) {
|
||||
std::stringstream ss;
|
||||
ss << doc;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
bool simple_document_iteration() {
|
||||
TEST_START();
|
||||
auto json = R"([1,[1,2]] {"a":1,"b":2} {"o":{"1":1,"2":2}} [1,2,3])"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
std::string_view expected[4] = {"[1,[1,2]]", "{\"a\":1,\"b\":2}", "{\"o\":{\"1\":1,\"2\":2}}", "[1,2,3]"};
|
||||
size_t counter{0};
|
||||
for(auto & doc : stream) {
|
||||
ASSERT_TRUE(counter < 4);
|
||||
ASSERT_EQUAL(my_string(doc), expected[counter++]);
|
||||
}
|
||||
ASSERT_EQUAL(counter, 4);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool simple_document_iteration_multiple_batches() {
|
||||
TEST_START();
|
||||
auto json = R"([1,[1,2]] {"a":1,"b":2} {"o":{"1":1,"2":2}} [1,2,3])"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json,32).get(stream));
|
||||
std::string_view expected[4] = {"[1,[1,2]]", "{\"a\":1,\"b\":2}", "{\"o\":{\"1\":1,\"2\":2}}", "[1,2,3]"};
|
||||
size_t counter{0};
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_TRUE(counter < 4);
|
||||
ASSERT_EQUAL(i.source(), expected[counter++]);
|
||||
}
|
||||
ASSERT_EQUAL(counter, 4);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool simple_document_iteration_with_parsing() {
|
||||
TEST_START();
|
||||
auto json = R"([1,[1,2]] {"a":1,"b":2} {"o":{"1":1,"2":2}} [1,2,3])"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
std::string_view expected[4] = {"[1,[1,2]]", "{\"a\":1,\"b\":2}", "{\"o\":{\"1\":1,\"2\":2}}", "[1,2,3]"};
|
||||
size_t counter{0};
|
||||
auto i = stream.begin();
|
||||
int64_t x;
|
||||
|
||||
ASSERT_EQUAL(i.source(),expected[counter++]);
|
||||
ASSERT_SUCCESS( (*i).at_pointer("/1/1").get(x) );
|
||||
ASSERT_EQUAL(x,2);
|
||||
++i;
|
||||
|
||||
ASSERT_EQUAL(i.source(),expected[counter++]);
|
||||
ASSERT_SUCCESS( (*i).find_field("a").get(x) );
|
||||
ASSERT_EQUAL(x,1);
|
||||
++i;
|
||||
|
||||
ASSERT_EQUAL(i.source(),expected[counter++]);
|
||||
ASSERT_SUCCESS( (*i).at_pointer("/o/2").get(x) );
|
||||
ASSERT_EQUAL(x,2);
|
||||
++i;
|
||||
|
||||
ASSERT_EQUAL(i.source(),expected[counter++]);
|
||||
ASSERT_SUCCESS( (*i).at_pointer("/2").get(x) );
|
||||
ASSERT_EQUAL(x,3);
|
||||
++i;
|
||||
|
||||
if (i != stream.end()) { return false; }
|
||||
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool atoms_json() {
|
||||
TEST_START();
|
||||
auto json = R"(5 true 20.3 "string" )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
|
||||
std::string_view expected[4] = {"5", "true", "20.3", "\"string\""};
|
||||
size_t counter{0};
|
||||
for (auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_EQUAL(i.source(), expected[counter++]);
|
||||
}
|
||||
ASSERT_EQUAL(counter,4);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool doc_index() {
|
||||
TEST_START();
|
||||
auto json = R"({"z":5} {"1":1,"2":2,"4":4} [7, 10, 9] [15, 11, 12, 13] [154, 110, 112, 1311])"_padded;
|
||||
std::string_view expected[5] = {R"({"z":5})",R"({"1":1,"2":2,"4":4})","[7, 10, 9]","[15, 11, 12, 13]","[154, 110, 112, 1311]"};
|
||||
size_t expected_indexes[5] = {0, 9, 29, 44, 65};
|
||||
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
size_t counter{0};
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_TRUE(counter < 5);
|
||||
ASSERT_EQUAL(i.current_index(), expected_indexes[counter]);
|
||||
ASSERT_EQUAL(i.source(), expected[counter]);
|
||||
counter++;
|
||||
}
|
||||
ASSERT_EQUAL(counter, 5);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool doc_index_multiple_batches() {
|
||||
TEST_START();
|
||||
auto json = R"({"z":5} {"1":1,"2":2,"4":4} [7, 10, 9] [15, 11, 12, 13] [154, 110, 112, 1311])"_padded;
|
||||
std::string_view expected[5] = {R"({"z":5})",R"({"1":1,"2":2,"4":4})","[7, 10, 9]","[15, 11, 12, 13]","[154, 110, 112, 1311]"};
|
||||
size_t expected_indexes[5] = {0, 9, 29, 44, 65};
|
||||
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
size_t counter{0};
|
||||
ASSERT_SUCCESS(parser.iterate_many(json,32).get(stream));
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_TRUE(counter < 5);
|
||||
ASSERT_EQUAL(i.current_index(), expected_indexes[counter]);
|
||||
ASSERT_EQUAL(i.source(), expected[counter]);
|
||||
counter++;
|
||||
}
|
||||
ASSERT_EQUAL(counter, 5);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool source_test() {
|
||||
TEST_START();
|
||||
auto json = R"([1,[1,2]] {"a":1,"b":2} {"o":{"1":1,"2":2}} [1,2,3] )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
std::string_view expected[4] = {"[1,[1,2]]", "{\"a\":1,\"b\":2}", "{\"o\":{\"1\":1,\"2\":2}}", "[1,2,3]"};
|
||||
size_t counter{0};
|
||||
for (auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_EQUAL(i.source(), expected[counter++]);
|
||||
}
|
||||
ASSERT_EQUAL(counter,4);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool truncated() {
|
||||
TEST_START();
|
||||
// The last JSON document is intentionally truncated.
|
||||
auto json = R"([1,2,3] {"1":1,"2":3,"4":4} [1,2 )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
|
||||
size_t counter{0};
|
||||
for (auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
counter++;
|
||||
}
|
||||
size_t truncated = stream.truncated_bytes();
|
||||
ASSERT_EQUAL(truncated, 6);
|
||||
ASSERT_EQUAL(counter,2);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool truncated_complete_docs() {
|
||||
TEST_START();
|
||||
auto json = R"([1,2,3] {"1":1,"2":3,"4":4} [1,2] )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
|
||||
size_t counter{0};
|
||||
for (auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
counter++;
|
||||
}
|
||||
size_t truncated = stream.truncated_bytes();
|
||||
ASSERT_EQUAL(truncated, 0);
|
||||
ASSERT_EQUAL(counter,3);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool truncated_unclosed_string() {
|
||||
TEST_START();
|
||||
// The last JSON document is intentionally truncated.
|
||||
auto json = R"([1,2,3] {"1":1,"2":3,"4":4} "intentionally unclosed string )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
// We use a window of json.size() though any large value would do.
|
||||
ASSERT_SUCCESS( parser.iterate_many(json).get(stream) );
|
||||
size_t counter{0};
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
counter++;
|
||||
}
|
||||
size_t truncated = stream.truncated_bytes();
|
||||
ASSERT_EQUAL(counter,2);
|
||||
ASSERT_EQUAL(truncated,32);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool truncated_unclosed_string_in_object() {
|
||||
// The last JSON document is intentionally truncated.
|
||||
auto json = R"([1,2,3] {"1":1,"2":3,"4":4} {"key":"intentionally unclosed string )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS( parser.iterate_many(json).get(stream) );
|
||||
size_t counter{0};
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
counter++;
|
||||
}
|
||||
size_t truncated = stream.truncated_bytes();
|
||||
ASSERT_EQUAL(counter,2);
|
||||
ASSERT_EQUAL(truncated,39);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool small_window() {
|
||||
TEST_START();
|
||||
std::vector<char> input;
|
||||
input.push_back('[');
|
||||
for(size_t i = 1; i < 1024; i++) {
|
||||
input.push_back('1');
|
||||
input.push_back(i < 1023 ? ',' : ']');
|
||||
}
|
||||
auto json = simdjson::padded_string(input.data(),input.size());
|
||||
ondemand::parser parser;
|
||||
size_t window_size{1024}; // deliberately too small
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS( parser.iterate_many(json, window_size).get(stream) );
|
||||
auto i = stream.begin();
|
||||
ASSERT_ERROR(i.error(), CAPACITY);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool large_window() {
|
||||
TEST_START();
|
||||
#if SIZE_MAX > 17179869184
|
||||
auto json = R"({"error":[],"result":{"token":"xxx"}}{"error":[],"result":{"token":"xxx"}})"_padded;
|
||||
ondemand::parser parser;
|
||||
uint64_t window_size{17179869184}; // deliberately too big
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS( parser.iterate_many(json, size_t(window_size)).get(stream) );
|
||||
auto i = stream.begin();
|
||||
ASSERT_ERROR(i.error(),CAPACITY);
|
||||
#endif
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool test_leading_spaces() {
|
||||
TEST_START();
|
||||
auto input = R"( [1,1] [1,2] [1,3] [1,4] [1,5] [1,6] [1,7] [1,8] [1,9] [1,10] [1,11] [1,12] [1,13] [1,14] [1,15] )"_padded;;
|
||||
size_t count{0};
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(input, 32).get(stream));
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_SUCCESS(i.error());
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQUAL(count,15);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
|
||||
bool test_crazy_leading_spaces() {
|
||||
TEST_START();
|
||||
auto input = R"( [1,1] [1,2] [1,3] [1,4] [1,5] [1,6] [1,7] [1,8] [1,9] [1,10] [1,11] [1,12] [1,13] [1,14] [1,15] )"_padded;;
|
||||
size_t count{0};
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(input, 32).get(stream));
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_SUCCESS(i.error());
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQUAL(count,15);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool adversarial_single_document() {
|
||||
TEST_START();
|
||||
auto json = R"({"f[)"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
size_t count{0};
|
||||
for (auto & doc : stream) {
|
||||
(void)doc;
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQUAL(count,0);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool adversarial_single_document_array() {
|
||||
TEST_START();
|
||||
auto json = R"(["this is an unclosed string ])"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
size_t count{0};
|
||||
for (auto & doc : stream) {
|
||||
(void)doc;
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQUAL(count,0);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool document_stream_test() {
|
||||
TEST_START();
|
||||
fflush(NULL);
|
||||
const size_t n_records = 10000;
|
||||
std::string data;
|
||||
std::vector<char> buf(1024);
|
||||
// Generating data
|
||||
for (size_t i = 0; i < n_records; ++i) {
|
||||
size_t n = snprintf(buf.data(),
|
||||
buf.size(),
|
||||
"{\"id\": %zu, \"name\": \"name%zu\", \"gender\": \"%s\", "
|
||||
"\"ete\": {\"id\": %zu, \"name\": \"eventail%zu\"}}",
|
||||
i, i, (i % 2) ? "homme" : "femme", i % 10, i % 10);
|
||||
if (n >= buf.size()) { abort(); }
|
||||
data += std::string(buf.data(), n);
|
||||
}
|
||||
|
||||
for(size_t batch_size = 1000; batch_size < 2000; batch_size += (batch_size>1050?10:1)) {
|
||||
fflush(NULL);
|
||||
simdjson::padded_string str(data);
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
size_t count{0};
|
||||
ASSERT_SUCCESS( parser.iterate_many(str, batch_size).get(stream) );
|
||||
for (auto & doc : stream) {
|
||||
int64_t keyid;
|
||||
ASSERT_SUCCESS( doc["id"].get(keyid) );
|
||||
ASSERT_EQUAL( keyid, int64_t(count) );
|
||||
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQUAL(count,n_records);
|
||||
}
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
|
||||
bool document_stream_utf8_test() {
|
||||
TEST_START();
|
||||
fflush(NULL);
|
||||
const size_t n_records = 10000;
|
||||
std::string data;
|
||||
std::vector<char> buf(1024);
|
||||
// Generating data
|
||||
for (size_t i = 0; i < n_records; ++i) {
|
||||
size_t n = snprintf(buf.data(),
|
||||
buf.size(),
|
||||
"{\"id\": %zu, \"name\": \"name%zu\", \"gender\": \"%s\", "
|
||||
"\"\xC3\xA9t\xC3\xA9\": {\"id\": %zu, \"name\": \"\xC3\xA9ventail%zu\"}}",
|
||||
i, i, (i % 2) ? "\xE2\xBA\x83" : "\xE2\xBA\x95", i % 10, i % 10);
|
||||
if (n >= buf.size()) { abort(); }
|
||||
data += std::string(buf.data(), n);
|
||||
}
|
||||
|
||||
for(size_t batch_size = 1000; batch_size < 2000; batch_size += (batch_size>1050?10:1)) {
|
||||
fflush(NULL);
|
||||
simdjson::padded_string str(data);
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
size_t count{0};
|
||||
ASSERT_SUCCESS( parser.iterate_many(str, batch_size).get(stream) );
|
||||
for (auto & doc : stream) {
|
||||
int64_t keyid;
|
||||
ASSERT_SUCCESS( doc["id"].get(keyid) );
|
||||
ASSERT_EQUAL( keyid, int64_t(count) );
|
||||
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQUAL( count, n_records )
|
||||
}
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool stress_data_race() {
|
||||
TEST_START();
|
||||
// Correct JSON.
|
||||
auto input = R"([1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] )"_padded;;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(input, 32).get(stream));
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_SUCCESS(i.error());
|
||||
}
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool stress_data_race_with_error() {
|
||||
TEST_START();
|
||||
#if SIMDJSON_THREAD_ENABLED
|
||||
std::cout << "ENABLED" << std::endl;
|
||||
#endif
|
||||
// Intentionally broken
|
||||
auto input = R"([1,23] [1,23] [1,23] [1,23 [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] [1,23] )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(input, 32).get(stream));
|
||||
size_t count{0};
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
auto error = i.error();
|
||||
if(count <= 3) {
|
||||
ASSERT_SUCCESS(error);
|
||||
} else {
|
||||
ASSERT_ERROR(error,TAPE_ERROR);
|
||||
break;
|
||||
}
|
||||
count++;
|
||||
}
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool run() {
|
||||
return
|
||||
simple_document_iteration() &&
|
||||
simple_document_iteration_multiple_batches() &&
|
||||
simple_document_iteration_with_parsing() &&
|
||||
atoms_json() &&
|
||||
doc_index() &&
|
||||
doc_index_multiple_batches() &&
|
||||
source_test() &&
|
||||
truncated() &&
|
||||
truncated_complete_docs() &&
|
||||
truncated_unclosed_string() &&
|
||||
small_window() &&
|
||||
large_window() &&
|
||||
test_leading_spaces() &&
|
||||
test_crazy_leading_spaces() &&
|
||||
adversarial_single_document() &&
|
||||
adversarial_single_document_array() &&
|
||||
document_stream_test() &&
|
||||
document_stream_utf8_test() &&
|
||||
stress_data_race() &&
|
||||
stress_data_race_with_error() &&
|
||||
true;
|
||||
}
|
||||
} // document_stream_tests
|
||||
|
||||
int main (int argc, char *argv[]) {
|
||||
return test_main(argc, argv, document_stream_tests::run);
|
||||
}
|
|
@ -349,6 +349,65 @@ bool json_pointer_rewind() {
|
|||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool iterate_many_example() {
|
||||
TEST_START();
|
||||
auto json = R"([1,2,3] {"1":1,"2":3,"4":4} [1,2,3] )"_padded;
|
||||
simdjson::ondemand::parser parser;
|
||||
simdjson::ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS(parser.iterate_many(json).get(stream));
|
||||
auto i = stream.begin();
|
||||
size_t count{0};
|
||||
size_t expected_indexes[3] = {0,9,29};
|
||||
std::string_view expected_doc[3] = {"[1,2,3]", R"({"1":1,"2":3,"4":4})", "[1,2,3]"};
|
||||
for(; i != stream.end(); ++i) {
|
||||
auto & doc = *i;
|
||||
ASSERT_SUCCESS(doc.type());
|
||||
ASSERT_SUCCESS(i.error());
|
||||
ASSERT_EQUAL(i.current_index(),expected_indexes[count]);
|
||||
ASSERT_EQUAL(i.source(),expected_doc[count]);
|
||||
count++;
|
||||
}
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
std::string my_string(ondemand::document& doc) {
|
||||
std::stringstream ss;
|
||||
ss << doc;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
bool iterate_many_truncated_example() {
|
||||
TEST_START();
|
||||
auto json = R"([1,2,3] {"1":1,"2":3,"4":4} {"key":"intentionally unclosed string )"_padded;
|
||||
simdjson::ondemand::parser parser;
|
||||
simdjson::ondemand::document_stream stream;
|
||||
ASSERT_SUCCESS( parser.iterate_many(json,json.size()).get(stream) );
|
||||
std::string_view expected[2] = {"[1,2,3]", R"({"1":1,"2":3,"4":4})"};
|
||||
size_t count{0};
|
||||
for(auto i = stream.begin(); i != stream.end(); ++i) {
|
||||
ASSERT_EQUAL(i.source(),expected[count++]);
|
||||
}
|
||||
size_t truncated = stream.truncated_bytes();
|
||||
ASSERT_EQUAL(truncated,39);
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
bool ndjson_basics_example() {
|
||||
TEST_START();
|
||||
auto json = R"({ "foo": 1 } { "foo": 2 } { "foo": 3 } )"_padded;
|
||||
ondemand::parser parser;
|
||||
ondemand::document_stream docs;
|
||||
ASSERT_SUCCESS( parser.iterate_many(json).get(docs) );
|
||||
size_t count{0};
|
||||
int64_t expected[3] = {1,2,3};
|
||||
for (auto & doc : docs) {
|
||||
int64_t actual;
|
||||
ASSERT_SUCCESS( doc["foo"].get(actual) );
|
||||
ASSERT_EQUAL( actual,expected[count++] );
|
||||
}
|
||||
TEST_SUCCEED();
|
||||
}
|
||||
|
||||
int main() {
|
||||
if (
|
||||
true
|
||||
|
@ -372,6 +431,9 @@ int main() {
|
|||
&& json_pointer_simple()
|
||||
&& json_pointer_multiple()
|
||||
&& json_pointer_rewind()
|
||||
&& iterate_many_example()
|
||||
&& iterate_many_truncated_example()
|
||||
&& ndjson_basics_example()
|
||||
) {
|
||||
return 0;
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue