25#pragma warning(disable: 4100)
27#pragma warning(disable: 4266)
29#pragma warning(disable: 4324)
31#pragma warning(disable: 4355)
33#pragma warning(disable: 4435)
35#pragma warning(disable: 4458)
37#pragma warning(disable: 4800)
40#include <arrow/io/api.h>
41#include <parquet/arrow/writer.h>
59 arrow::Status
Close()
override {
61 return arrow::Status::OK();
64 arrow::Status
Flush()
override {
66 return arrow::Status::OK();
69 arrow::Result<int64_t>
Tell()
const override {
77 arrow::Status
Write(
const void* data, int64_t nbytes)
override {
79 return arrow::Status::IOError(
"Write on closed stream");
81 myOStream.write(
reinterpret_cast<const char*
>(data), nbytes);
83 return arrow::Status::IOError(
"Failed to write to ostream");
85 return arrow::Status::OK();
98 Impl(
const std::string& columnNames,
const int batchSize)
105 parquet::Compression::type
myCompression = parquet::Compression::UNCOMPRESSED;
114 std::shared_ptr<arrow::Schema>
mySchema = arrow::schema({});
120 std::vector<std::shared_ptr<arrow::ArrayBuilder> >
myBuilders;
126 std::vector<std::shared_ptr<arrow::Scalar> >
myValues;
152 for (
const auto& field :
mySchema->fields()) {
153 if (field->name() == attrString) {
166 throw ProcessError(
TLF(
"Unexpected attribute '%', this file format does not support Parquet output yet.",
toString(attr)));
171 template <
class ATTR_TYPE,
class BUILDER>
172 void checkBuilder(
const ATTR_TYPE& attr,
const std::shared_ptr<arrow::DataType>& (*dataType)()) {
176 for (
const auto& field :
mySchema->fields()) {
177 if (field->name() == fieldName) {
182 auto builder = std::make_shared<BUILDER>();
185 PARQUET_THROW_NOT_OK(builder->AppendNulls(
myBuilders.back()->length()));
202 if (compression ==
"snappy") {
203 myImpl->myCompression = parquet::Compression::SNAPPY;
204 }
else if (compression ==
"gzip") {
205 myImpl->myCompression = parquet::Compression::GZIP;
206 }
else if (compression ==
"brotli") {
207 myImpl->myCompression = parquet::Compression::BROTLI;
208 }
else if (compression ==
"zstd") {
209 myImpl->myCompression = parquet::Compression::ZSTD;
210 }
else if (compression ==
"lz4") {
211 myImpl->myCompression = parquet::Compression::LZ4;
212 }
else if (compression ==
"bz2") {
213 myImpl->myCompression = parquet::Compression::BZ2;
214 }
else if (compression !=
"" && compression !=
"uncompressed") {
217 if (!arrow::util::Codec::IsAvailable(
myImpl->myCompression)) {
218 WRITE_WARNINGF(
"Compression '%' not available, falling back to uncompressed.", compression);
219 myImpl->myCompression = parquet::Compression::UNCOMPRESSED;
229 myImpl->myXMLStack.push_back((
int)
myImpl->myValues.size());
230 if (!
myImpl->myWroteHeader) {
231 myImpl->myCurrentTag = xmlElement;
233 if (
myImpl->myMaxDepth == (
int)
myImpl->myXMLStack.size() &&
myImpl->myWroteHeader &&
myImpl->myCurrentTag != xmlElement) {
234 WRITE_WARNINGF(
"Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.",
myImpl->myCurrentTag, xmlElement);
241 myImpl->myXMLStack.push_back((
int)
myImpl->myValues.size());
242 if (!
myImpl->myWroteHeader) {
246 WRITE_WARNINGF(
"Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.",
myImpl->myCurrentTag,
toString(xmlElement));
253 if (
myImpl->myMaxDepth == 0) {
257 if ((
myImpl->myMaxDepth == (
int)
myImpl->myXMLStack.size() ||
myImpl->myXMLStack.empty()) && !
myImpl->myWroteHeader) {
260 if (!
myImpl->myCheckColumns) {
261 WRITE_WARNING(
"Column based formats are still experimental. Autodetection only works for homogeneous output.");
263 auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
264 std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(
myImpl->myCompression)->build();
265 myImpl->myParquetWriter = *parquet::arrow::FileWriter::Open(*
myImpl->mySchema, arrow::default_memory_pool(), arrow_stream, props);
266 myImpl->myWroteHeader =
true;
268 bool writeBatch =
false;
269 if (
myImpl->myNeedsWrite) {
271 for (
int i = 0; i < (int)
myImpl->myExpectedAttrs.size(); ++i) {
272 if (
myImpl->myExpectedAttrs.test(i) && !
myImpl->mySeenAttrs.test(i)) {
273 WRITE_ERRORF(
"Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
279 for (
auto& builder :
myImpl->myBuilders) {
280 const auto val = index < (int)
myImpl->myValues.size() ?
myImpl->myValues[index++] :
nullptr;
281 PARQUET_THROW_NOT_OK(val ==
nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
283 writeBatch =
myImpl->myWroteHeader &&
myImpl->myBuilders.back()->length() >=
myImpl->myBatchSize;
284 myImpl->mySeenAttrs.reset();
285 myImpl->myNeedsWrite =
false;
287 if (writeBatch || (
myImpl->myXMLStack.empty() && !
myImpl->myBuilders.empty())) {
288 std::vector<std::shared_ptr<arrow::Array> > data;
289 for (
auto& builder :
myImpl->myBuilders) {
290 std::shared_ptr<arrow::Array> column;
291 PARQUET_THROW_NOT_OK(builder->Finish(&column));
292 data.push_back(column);
295 auto batch = arrow::RecordBatch::Make(
myImpl->mySchema, data.back()->length(), data);
296 PARQUET_THROW_NOT_OK(
myImpl->myParquetWriter->WriteRecordBatch(*batch));
298 if (!
myImpl->myXMLStack.empty()) {
299 if ((
int)
myImpl->myValues.size() >
myImpl->myXMLStack.back()) {
302 myImpl->myXMLStack.pop_back();
313 myImpl->myValues.push_back(isNull ?
nullptr : std::make_shared<arrow::DoubleScalar>(val));
316 myImpl->myValues.push_back(isNull ?
nullptr : std::make_shared<arrow::FloatScalar>((
float)val));
325 myImpl->myValues.push_back(isNull ?
nullptr : std::make_shared<arrow::Int32Scalar>(val));
331 assert(!
myImpl->myCheckColumns);
332 if (into.precision() > 2) {
333 myImpl->checkBuilder<std::string, arrow::DoubleBuilder>(attr, arrow::float64);
334 myImpl->myValues.push_back(isNull ?
nullptr : std::make_shared<arrow::DoubleScalar>(val));
336 myImpl->checkBuilder<std::string, arrow::FloatBuilder>(attr, arrow::float32);
337 myImpl->myValues.push_back(isNull ?
nullptr : std::make_shared<arrow::FloatScalar>((
float)val));
344 assert(!
myImpl->myCheckColumns);
345 myImpl->checkBuilder<std::string, arrow::Int32Builder>(attr, arrow::int32);
346 myImpl->myValues.push_back(isNull ?
nullptr : std::make_shared<arrow::Int32Scalar>(val));
354 myImpl->myValues.push_back(std::make_shared<arrow::StringScalar>(val));
360 assert(!
myImpl->myCheckColumns);
361 myImpl->checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
362 myImpl->myValues.push_back(std::make_shared<arrow::StringScalar>(val));
370 myImpl->myValues.push_back(
nullptr);
376 assert(!
myImpl->myCheckColumns);
377 myImpl->checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
378 myImpl->myValues.push_back(
nullptr);
387 myImpl->myValues.push_back(std::make_shared<arrow::DoubleScalar>(
STEPS2TIME(val)));
396 return myImpl->myWroteHeader;
402 myImpl->myExpectedAttrs = expected;
403 myImpl->myMaxDepth = depth;
404 myImpl->myCheckColumns = expected.any();
#define WRITE_WARNINGF(...)
#define WRITE_ERRORF(...)
#define WRITE_WARNING(msg)
std::string time2string(SUMOTime t, bool humanReadable)
convert SUMOTime to string (independently of global format setting)
SumoXMLTag
Numbers representing SUMO-XML - element names.
std::bitset< 96 > SumoXMLAttrMask
SumoXMLAttr
Numbers representing SUMO-XML - attributes.
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
bool closed() const override
arrow::Status Close() override
arrow::Status Flush() override
arrow::Status Write(const void *data, int64_t nbytes) override
ArrowOStreamWrapper(std::ostream &out)
arrow::Result< int64_t > Tell() const override