Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
ParquetFormatter.cpp
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2012-2026 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// An output formatter for Parquet files
19/****************************************************************************/
20#include <config.h>
21
22#ifdef _MSC_VER
23#pragma warning(push)
24/* Disable warning about unused parameters */
25#pragma warning(disable: 4100)
26/* Disable warning about hidden function arrow::io::Writable::Write */
27#pragma warning(disable: 4266)
28/* Disable warning about padded memory layout */
29#pragma warning(disable: 4324)
30/* Disable warning about this in initializers */
31#pragma warning(disable: 4355)
32/* Disable warning about changed memory layout due to virtual base class */
33#pragma warning(disable: 4435)
34/* Disable warning about declaration hiding class member */
35#pragma warning(disable: 4458)
36/* Disable warning about implicit conversion of int to bool */
37#pragma warning(disable: 4800)
38#endif
39#include <arrow/api.h>
40#include <arrow/io/api.h>
41#include <parquet/arrow/writer.h>
42#ifdef _MSC_VER
43#pragma warning(pop)
44#endif
45
48#include "ParquetFormatter.h"
49
50
51// ===========================================================================
52// helper class definitions
53// ===========================================================================
54class ArrowOStreamWrapper : public arrow::io::OutputStream {
55public:
56 ArrowOStreamWrapper(std::ostream& out)
57 : myOStream(out), myAmOpen(true) {}
58
59 arrow::Status Close() override {
60 myAmOpen = false;
61 return arrow::Status::OK();
62 }
63
64 arrow::Status Flush() override {
65 myOStream.flush();
66 return arrow::Status::OK();
67 }
68
69 arrow::Result<int64_t> Tell() const override {
70 return myOStream.tellp();
71 }
72
73 bool closed() const override {
74 return !myAmOpen;
75 }
76
77 arrow::Status Write(const void* data, int64_t nbytes) override {
78 if (!myAmOpen) {
79 return arrow::Status::IOError("Write on closed stream");
80 }
81 myOStream.write(reinterpret_cast<const char*>(data), nbytes);
82 if (!myOStream) {
83 return arrow::Status::IOError("Failed to write to ostream");
84 }
85 return arrow::Status::OK();
86 }
87
88private:
89 std::ostream& myOStream;
91};
92
93
94// ===========================================================================
95// ParquetFormatter::Impl definition
96// ===========================================================================
98 Impl(const std::string& columnNames, const int batchSize)
99 : myHeaderFormat(columnNames), myBatchSize(batchSize) {}
100
102 const std::string myHeaderFormat;
103
105 parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
106
108 const int myBatchSize;
109
111 std::string myCurrentTag;
112
114 std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
115
117 std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
118
120 std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
121
123 std::vector<int> myXMLStack;
124
126 std::vector<std::shared_ptr<arrow::Scalar> > myValues;
127
129 int myMaxDepth = 2;
130
132 bool myWroteHeader = false;
133
135 bool myCheckColumns = false;
136
138 bool myNeedsWrite = false;
139
142
145
147 std::string getAttrString(const std::string& attrString) const {
148 if (myHeaderFormat == "plain") {
149 return attrString;
150 }
151 if (myHeaderFormat == "auto") {
152 for (const auto& field : mySchema->fields()) {
153 if (field->name() == attrString) {
154 return myCurrentTag + "_" + attrString;
155 }
156 }
157 return attrString;
158 }
159 return myCurrentTag + "_" + attrString;
160 }
161
162 void checkAttr(const SumoXMLAttr attr) {
163 if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
164 mySeenAttrs.set(attr);
165 if (!myExpectedAttrs.test(attr)) {
166 throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
167 }
168 }
169 }
170
171 template <class ATTR_TYPE, class BUILDER>
172 void checkBuilder(const ATTR_TYPE& attr, const std::shared_ptr<arrow::DataType>& (*dataType)()) {
173 myNeedsWrite = true;
174 if (!myWroteHeader) {
175 const std::string fieldName = getAttrString(toString(attr));
176 for (const auto& field : mySchema->fields()) {
177 if (field->name() == fieldName) {
178 return;
179 }
180 }
181 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(fieldName, dataType()));
182 auto builder = std::make_shared<BUILDER>();
183 if (!myBuilders.empty()) {
184 if (myBuilders.back()->length() > 0) {
185 PARQUET_THROW_NOT_OK(builder->AppendNulls(myBuilders.back()->length()));
186 }
187 while (myValues.size() < myBuilders.size()) {
188 myValues.push_back(nullptr);
189 }
190 }
191 myBuilders.push_back(builder);
192 }
193 }
194};
195
196
197// ===========================================================================
198// member method definitions
199// ===========================================================================
200ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
201 : OutputFormatter(OutputFormatterType::PARQUET), myImpl(std::make_unique<Impl>(columnNames, batchSize)) {
202 if (compression == "snappy") {
203 myImpl->myCompression = parquet::Compression::SNAPPY;
204 } else if (compression == "gzip") {
205 myImpl->myCompression = parquet::Compression::GZIP;
206 } else if (compression == "brotli") {
207 myImpl->myCompression = parquet::Compression::BROTLI;
208 } else if (compression == "zstd") {
209 myImpl->myCompression = parquet::Compression::ZSTD;
210 } else if (compression == "lz4") {
211 myImpl->myCompression = parquet::Compression::LZ4;
212 } else if (compression == "bz2") {
213 myImpl->myCompression = parquet::Compression::BZ2;
214 } else if (compression != "" && compression != "uncompressed") {
215 WRITE_ERRORF("Unknown compression: %", compression);
216 }
217 if (!arrow::util::Codec::IsAvailable(myImpl->myCompression)) {
218 WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
219 myImpl->myCompression = parquet::Compression::UNCOMPRESSED;
220 }
221}
222
223
225
226
227void
228ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
229 myImpl->myXMLStack.push_back((int)myImpl->myValues.size());
230 if (!myImpl->myWroteHeader) {
231 myImpl->myCurrentTag = xmlElement;
232 }
233 if (myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() && myImpl->myWroteHeader && myImpl->myCurrentTag != xmlElement) {
234 WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myImpl->myCurrentTag, xmlElement);
235 }
236}
237
238
239void
240ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
241 myImpl->myXMLStack.push_back((int)myImpl->myValues.size());
242 if (!myImpl->myWroteHeader) {
243 myImpl->myCurrentTag = toString(xmlElement);
244 }
245 if (myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() && myImpl->myWroteHeader && myImpl->myCurrentTag != toString(xmlElement)) {
246 WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myImpl->myCurrentTag, toString(xmlElement));
247 }
248}
249
250
251bool
252ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
253 if (myImpl->myMaxDepth == 0) {
254 // the auto detection case: the first closed tag determines the depth
255 myImpl->myMaxDepth = (int)myImpl->myXMLStack.size();
256 }
257 if ((myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() || myImpl->myXMLStack.empty()) && !myImpl->myWroteHeader) {
258 // we are at the correct depth or the document has ended (XML stack is empty)
259 // so we should initialize the writer with the schema (if not done yet)
260 if (!myImpl->myCheckColumns) {
261 WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
262 }
263 auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
264 std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myImpl->myCompression)->build();
265 myImpl->myParquetWriter = *parquet::arrow::FileWriter::Open(*myImpl->mySchema, arrow::default_memory_pool(), arrow_stream, props);
266 myImpl->myWroteHeader = true;
267 }
268 bool writeBatch = false;
269 if (myImpl->myNeedsWrite) {
270 if (myImpl->myCheckColumns && (int)myImpl->myXMLStack.size() == myImpl->myMaxDepth && myImpl->myExpectedAttrs != myImpl->mySeenAttrs) {
271 for (int i = 0; i < (int)myImpl->myExpectedAttrs.size(); ++i) {
272 if (myImpl->myExpectedAttrs.test(i) && !myImpl->mySeenAttrs.test(i)) {
273 WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
275 }
276 }
277 }
278 int index = 0;
279 for (auto& builder : myImpl->myBuilders) {
280 const auto val = index < (int)myImpl->myValues.size() ? myImpl->myValues[index++] : nullptr;
281 PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
282 }
283 writeBatch = myImpl->myWroteHeader && myImpl->myBuilders.back()->length() >= myImpl->myBatchSize;
284 myImpl->mySeenAttrs.reset();
285 myImpl->myNeedsWrite = false;
286 }
287 if (writeBatch || (myImpl->myXMLStack.empty() && !myImpl->myBuilders.empty())) {
288 std::vector<std::shared_ptr<arrow::Array> > data;
289 for (auto& builder : myImpl->myBuilders) {
290 std::shared_ptr<arrow::Array> column;
291 PARQUET_THROW_NOT_OK(builder->Finish(&column));
292 data.push_back(column);
293 // builder.reset();
294 }
295 auto batch = arrow::RecordBatch::Make(myImpl->mySchema, data.back()->length(), data);
296 PARQUET_THROW_NOT_OK(myImpl->myParquetWriter->WriteRecordBatch(*batch));
297 }
298 if (!myImpl->myXMLStack.empty()) {
299 if ((int)myImpl->myValues.size() > myImpl->myXMLStack.back()) {
300 myImpl->myValues.resize(myImpl->myXMLStack.back());
301 }
302 myImpl->myXMLStack.pop_back();
303 }
304 return false;
305}
306
307
308void
309ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
310 myImpl->checkAttr(attr);
311 if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
312 myImpl->checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
313 myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
314 } else {
315 myImpl->checkBuilder<SumoXMLAttr, arrow::FloatBuilder>(attr, arrow::float32);
316 myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
317 }
318}
319
320
321void
322ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
323 myImpl->checkAttr(attr);
324 myImpl->checkBuilder<SumoXMLAttr, arrow::Int32Builder>(attr, arrow::int32);
325 myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
326}
327
328
329void
330ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val, const bool isNull) {
331 assert(!myImpl->myCheckColumns);
332 if (into.precision() > 2) {
333 myImpl->checkBuilder<std::string, arrow::DoubleBuilder>(attr, arrow::float64);
334 myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
335 } else {
336 myImpl->checkBuilder<std::string, arrow::FloatBuilder>(attr, arrow::float32);
337 myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
338 }
339}
340
341
342void
343ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val, const bool isNull) {
344 assert(!myImpl->myCheckColumns);
345 myImpl->checkBuilder<std::string, arrow::Int32Builder>(attr, arrow::int32);
346 myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
347}
348
349
350void
351ParquetFormatter::writeStringAttr(const SumoXMLAttr attr, const std::string& val) {
352 myImpl->checkAttr(attr);
353 myImpl->checkBuilder<SumoXMLAttr, arrow::StringBuilder>(attr, arrow::utf8);
354 myImpl->myValues.push_back(std::make_shared<arrow::StringScalar>(val));
355}
356
357
358void
359ParquetFormatter::writeStringAttr(const std::string& attr, const std::string& val) {
360 assert(!myImpl->myCheckColumns);
361 myImpl->checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
362 myImpl->myValues.push_back(std::make_shared<arrow::StringScalar>(val));
363}
364
365
366void
368 myImpl->checkAttr(attr);
369 myImpl->checkBuilder<SumoXMLAttr, arrow::StringBuilder>(attr, arrow::utf8);
370 myImpl->myValues.push_back(nullptr);
371}
372
373
374void
375ParquetFormatter::writeNullAttr(const std::string& attr) {
376 assert(!myImpl->myCheckColumns);
377 myImpl->checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
378 myImpl->myValues.push_back(nullptr);
379}
380
381
382void
383ParquetFormatter::writeTime(std::ostream& /* into */, const SumoXMLAttr attr, const SUMOTime val) {
384 if (!gHumanReadableTime) {
385 // always float64 for machine-readable time, regardless of stream precision
386 myImpl->checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
387 myImpl->myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
388 return;
389 }
390 writeStringAttr(attr, time2string(val));
391}
392
393
394bool
396 return myImpl->myWroteHeader;
397}
398
399
400void
402 myImpl->myExpectedAttrs = expected;
403 myImpl->myMaxDepth = depth;
404 myImpl->myCheckColumns = expected.any();
405}
406
407
408/****************************************************************************/
long long int SUMOTime
Definition GUI.h:36
#define WRITE_WARNINGF(...)
Definition MsgHandler.h:287
#define WRITE_ERRORF(...)
Definition MsgHandler.h:296
#define WRITE_WARNING(msg)
Definition MsgHandler.h:286
#define TLF(string,...)
Definition MsgHandler.h:306
OutputFormatterType
std::string time2string(SUMOTime t, bool humanReadable)
convert SUMOTime to string (independently of global format setting)
Definition SUMOTime.cpp:91
#define STEPS2TIME(x)
Definition SUMOTime.h:58
SumoXMLTag
Numbers representing SUMO-XML - element names.
std::bitset< 96 > SumoXMLAttrMask
SumoXMLAttr
Numbers representing SUMO-XML - attributes.
@ SUMO_ATTR_Y
@ SUMO_ATTR_X
bool gHumanReadableTime
Definition StdDefs.cpp:31
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition ToString.h:49
bool closed() const override
arrow::Status Close() override
arrow::Status Flush() override
arrow::Status Write(const void *data, int64_t nbytes) override
ArrowOStreamWrapper(std::ostream &out)
arrow::Result< int64_t > Tell() const override
Abstract base class for output formatters.
ParquetFormatter(const std::string &columnNames, const std::string &compression="", const int batchSize=1000000)
Constructor.
void writeNullAttr(const SumoXMLAttr attr)
void openTag(std::ostream &into, const std::string &xmlElement) override
Opens an XML tag.
void setExpectedAttributes(const SumoXMLAttrMask &expected, const int depth=2) override
Set the expected attributes to write. This is used for tracking which attributes are expected in tabl...
~ParquetFormatter() override
Destructor (out-of-line: Impl is incomplete here)
bool wroteHeader() const override
Returns whether a header has been written. Useful to detect whether a file is being used by multiple ...
void writeAttr(std::ostream &, const SumoXMLAttr attr, const T &val, const bool isNull)
writes a named attribute
std::unique_ptr< Impl > myImpl
void writeStringAttr(const SumoXMLAttr attr, const std::string &val)
non-template helpers; defined in the .cpp where arrow/parquet are available
bool closeTag(std::ostream &into, const std::string &comment="") override
Closes the most recently opened tag and optinally add a comment.
void writeTime(std::ostream &into, const SumoXMLAttr attr, const SUMOTime val) override
Definition json.hpp:4471
std::string myCurrentTag
the currently read tag (only valid when generating the header)
void checkBuilder(const ATTR_TYPE &attr, const std::shared_ptr< arrow::DataType > &(*dataType)())
parquet::Compression::type myCompression
the compression to use
std::vector< std::shared_ptr< arrow::Scalar > > myValues
the current attribute / column values
std::string getAttrString(const std::string &attrString) const
column-name lookup honoring the headerFormat option
std::vector< std::shared_ptr< arrow::ArrayBuilder > > myBuilders
the content array builders for the table
std::unique_ptr< parquet::arrow::FileWriter > myParquetWriter
the output stream writer
std::shared_ptr< arrow::Schema > mySchema
the table schema
SumoXMLAttrMask mySeenAttrs
the attributes already seen (including null values)
void checkAttr(const SumoXMLAttr attr)
bool myNeedsWrite
whether there is still unwritten data
Impl(const std::string &columnNames, const int batchSize)
const int myBatchSize
the number of rows to write per batch
int myMaxDepth
the maximum depth of the XML hierarchy
SumoXMLAttrMask myExpectedAttrs
the attributes which are expected for a complete row (including null values)
bool myCheckColumns
whether the columns should be checked for completeness
bool myWroteHeader
whether the schema has been constructed completely
const std::string myHeaderFormat
the format to use for the column names
std::vector< int > myXMLStack
The number of attributes in the currently open XML elements.