Line data Source code
1 : /****************************************************************************/
2 : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 : // Copyright (C) 2012-2026 German Aerospace Center (DLR) and others.
4 : // This program and the accompanying materials are made available under the
5 : // terms of the Eclipse Public License 2.0 which is available at
6 : // https://www.eclipse.org/legal/epl-2.0/
7 : // This Source Code may also be made available under the following Secondary
8 : // Licenses when the conditions for such availability set forth in the Eclipse
9 : // Public License 2.0 are satisfied: GNU General Public License, version 2
10 : // or later which is available at
11 : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 : /****************************************************************************/
14 : /// @file ParquetFormatter.cpp
15 : /// @author Michael Behrisch
16 : /// @date 2025-06-17
17 : ///
18 : // An output formatter for Parquet files
19 : /****************************************************************************/
20 : #include <config.h>
21 :
22 : #ifdef _MSC_VER
23 : #pragma warning(push)
24 : /* Disable warning about unused parameters */
25 : #pragma warning(disable: 4100)
26 : /* Disable warning about hidden function arrow::io::Writable::Write */
27 : #pragma warning(disable: 4266)
28 : /* Disable warning about padded memory layout */
29 : #pragma warning(disable: 4324)
30 : /* Disable warning about this in initializers */
31 : #pragma warning(disable: 4355)
32 : /* Disable warning about changed memory layout due to virtual base class */
33 : #pragma warning(disable: 4435)
34 : /* Disable warning about declaration hiding class member */
35 : #pragma warning(disable: 4458)
36 : /* Disable warning about implicit conversion of int to bool */
37 : #pragma warning(disable: 4800)
38 : #endif
39 : #include <arrow/api.h>
40 : #include <arrow/io/api.h>
41 : #include <parquet/arrow/writer.h>
42 : #ifdef _MSC_VER
43 : #pragma warning(pop)
44 : #endif
45 :
46 : #include <utils/common/MsgHandler.h>
47 : #include <utils/common/ToString.h>
48 : #include "ParquetFormatter.h"
49 :
50 :
51 : // ===========================================================================
52 : // helper class definitions
53 : // ===========================================================================
54 : class ArrowOStreamWrapper : public arrow::io::OutputStream {
55 : public:
56 : ArrowOStreamWrapper(std::ostream& out)
57 46 : : myOStream(out), myAmOpen(true) {}
58 :
59 0 : arrow::Status Close() override {
60 0 : myAmOpen = false;
61 0 : return arrow::Status::OK();
62 : }
63 :
64 0 : arrow::Status Flush() override {
65 0 : myOStream.flush();
66 0 : return arrow::Status::OK();
67 : }
68 :
69 2863 : arrow::Result<int64_t> Tell() const override {
70 2863 : return myOStream.tellp();
71 : }
72 :
73 0 : bool closed() const override {
74 0 : return !myAmOpen;
75 : }
76 :
77 1819 : arrow::Status Write(const void* data, int64_t nbytes) override {
78 1819 : if (!myAmOpen) {
79 : return arrow::Status::IOError("Write on closed stream");
80 : }
81 1819 : myOStream.write(reinterpret_cast<const char*>(data), nbytes);
82 1819 : if (!myOStream) {
83 : return arrow::Status::IOError("Failed to write to ostream");
84 : }
85 : return arrow::Status::OK();
86 : }
87 :
88 : private:
89 : std::ostream& myOStream;
90 : bool myAmOpen;
91 : };
92 :
93 :
94 : // ===========================================================================
95 : // ParquetFormatter::Impl definition
96 : // ===========================================================================
97 : struct ParquetFormatter::Impl {
98 46 : Impl(const std::string& columnNames, const int batchSize)
99 138 : : myHeaderFormat(columnNames), myBatchSize(batchSize) {}
100 :
101 : /// @brief the format to use for the column names
102 : const std::string myHeaderFormat;
103 :
104 : /// @brief the compression to use
105 : parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
106 :
107 : /// @brief the number of rows to write per batch
108 : const int myBatchSize;
109 :
110 : /// @brief the currently read tag (only valid when generating the header)
111 : std::string myCurrentTag;
112 :
113 : /// @brief the table schema
114 : std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
115 :
116 : /// @brief the output stream writer
117 : std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
118 :
119 : /// @brief the content array builders for the table
120 : std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
121 :
122 : /// @brief The number of attributes in the currently open XML elements
123 : std::vector<int> myXMLStack;
124 :
125 : /// @brief the current attribute / column values
126 : std::vector<std::shared_ptr<arrow::Scalar> > myValues;
127 :
128 : /// @brief the maximum depth of the XML hierarchy
129 : int myMaxDepth = 2;
130 :
131 : /// @brief whether the schema has been constructed completely
132 : bool myWroteHeader = false;
133 :
134 : /// @brief whether the columns should be checked for completeness
135 : bool myCheckColumns = false;
136 :
137 : /// @brief whether there is still unwritten data
138 : bool myNeedsWrite = false;
139 :
140 : /// @brief the attributes which are expected for a complete row (including null values)
141 : SumoXMLAttrMask myExpectedAttrs;
142 :
143 : /// @brief the attributes already seen (including null values)
144 : SumoXMLAttrMask mySeenAttrs;
145 :
146 : /// @brief column-name lookup honoring the headerFormat option
147 54246 : std::string getAttrString(const std::string& attrString) const {
148 54246 : if (myHeaderFormat == "plain") {
149 : return attrString;
150 : }
151 54246 : if (myHeaderFormat == "auto") {
152 0 : for (const auto& field : mySchema->fields()) {
153 0 : if (field->name() == attrString) {
154 0 : return myCurrentTag + "_" + attrString;
155 : }
156 : }
157 : return attrString;
158 : }
159 108492 : return myCurrentTag + "_" + attrString;
160 : }
161 :
162 17904 : void checkAttr(const SumoXMLAttr attr) {
163 17904 : if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
164 8278 : mySeenAttrs.set(attr);
165 8278 : if (!myExpectedAttrs.test(attr)) {
166 0 : throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
167 : }
168 : }
169 17904 : }
170 :
171 : template <class ATTR_TYPE, class BUILDER>
172 72236 : void checkBuilder(const ATTR_TYPE& attr, const std::shared_ptr<arrow::DataType>& (*dataType)()) {
173 72236 : myNeedsWrite = true;
174 72236 : if (!myWroteHeader) {
175 108492 : const std::string fieldName = getAttrString(toString(attr));
176 386531 : for (const auto& field : mySchema->fields()) {
177 385986 : if (field->name() == fieldName) {
178 : return;
179 : }
180 : }
181 2180 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(fieldName, dataType()));
182 : auto builder = std::make_shared<BUILDER>();
183 545 : if (!myBuilders.empty()) {
184 501 : if (myBuilders.back()->length() > 0) {
185 140 : PARQUET_THROW_NOT_OK(builder->AppendNulls(myBuilders.back()->length()));
186 : }
187 589 : while (myValues.size() < myBuilders.size()) {
188 176 : myValues.push_back(nullptr);
189 : }
190 : }
191 1090 : myBuilders.push_back(builder);
192 : }
193 : }
194 : };
195 :
196 :
197 : // ===========================================================================
198 : // member method definitions
199 : // ===========================================================================
200 46 : ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
201 46 : : OutputFormatter(OutputFormatterType::PARQUET), myImpl(std::make_unique<Impl>(columnNames, batchSize)) {
202 46 : if (compression == "snappy") {
203 0 : myImpl->myCompression = parquet::Compression::SNAPPY;
204 46 : } else if (compression == "gzip") {
205 0 : myImpl->myCompression = parquet::Compression::GZIP;
206 46 : } else if (compression == "brotli") {
207 0 : myImpl->myCompression = parquet::Compression::BROTLI;
208 46 : } else if (compression == "zstd") {
209 0 : myImpl->myCompression = parquet::Compression::ZSTD;
210 46 : } else if (compression == "lz4") {
211 0 : myImpl->myCompression = parquet::Compression::LZ4;
212 46 : } else if (compression == "bz2") {
213 0 : myImpl->myCompression = parquet::Compression::BZ2;
214 46 : } else if (compression != "" && compression != "uncompressed") {
215 0 : WRITE_ERRORF("Unknown compression: %", compression);
216 : }
217 46 : if (!arrow::util::Codec::IsAvailable(myImpl->myCompression)) {
218 0 : WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
219 0 : myImpl->myCompression = parquet::Compression::UNCOMPRESSED;
220 : }
221 46 : }
222 :
223 :
224 92 : ParquetFormatter::~ParquetFormatter() = default;
225 :
226 :
227 : void
228 6603 : ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
229 6603 : myImpl->myXMLStack.push_back((int)myImpl->myValues.size());
230 6603 : if (!myImpl->myWroteHeader) {
231 5519 : myImpl->myCurrentTag = xmlElement;
232 : }
233 6603 : if (myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() && myImpl->myWroteHeader && myImpl->myCurrentTag != xmlElement) {
234 699 : WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myImpl->myCurrentTag, xmlElement);
235 : }
236 6603 : }
237 :
238 :
239 : void
240 1660 : ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
241 1660 : myImpl->myXMLStack.push_back((int)myImpl->myValues.size());
242 1660 : if (!myImpl->myWroteHeader) {
243 552 : myImpl->myCurrentTag = toString(xmlElement);
244 : }
245 2824 : if (myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() && myImpl->myWroteHeader && myImpl->myCurrentTag != toString(xmlElement)) {
246 69 : WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myImpl->myCurrentTag, toString(xmlElement));
247 : }
248 1660 : }
249 :
250 :
251 : bool
252 8309 : ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
253 8309 : if (myImpl->myMaxDepth == 0) {
254 : // the auto detection case: the first closed tag determines the depth
255 0 : myImpl->myMaxDepth = (int)myImpl->myXMLStack.size();
256 : }
257 8309 : if ((myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() || myImpl->myXMLStack.empty()) && !myImpl->myWroteHeader) {
258 : // we are at the correct depth or the document has ended (XML stack is empty)
259 : // so we should initialize the writer with the schema (if not done yet)
260 46 : if (!myImpl->myCheckColumns) {
261 54 : WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
262 : }
263 : auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
264 46 : std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myImpl->myCompression)->build();
265 184 : myImpl->myParquetWriter = *parquet::arrow::FileWriter::Open(*myImpl->mySchema, arrow::default_memory_pool(), arrow_stream, props);
266 46 : myImpl->myWroteHeader = true;
267 : }
268 : bool writeBatch = false;
269 8309 : if (myImpl->myNeedsWrite) {
270 8179 : if (myImpl->myCheckColumns && (int)myImpl->myXMLStack.size() == myImpl->myMaxDepth && myImpl->myExpectedAttrs != myImpl->mySeenAttrs) {
271 1552 : for (int i = 0; i < (int)myImpl->myExpectedAttrs.size(); ++i) {
272 1536 : if (myImpl->myExpectedAttrs.test(i) && !myImpl->mySeenAttrs.test(i)) {
273 36 : WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
274 : toString((SumoXMLAttr)i));
275 : }
276 : }
277 : }
278 : int index = 0;
279 83566 : for (auto& builder : myImpl->myBuilders) {
280 76233 : const auto val = index < (int)myImpl->myValues.size() ? myImpl->myValues[index++] : nullptr;
281 148661 : PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
282 : }
283 7333 : writeBatch = myImpl->myWroteHeader && myImpl->myBuilders.back()->length() >= myImpl->myBatchSize;
284 : myImpl->mySeenAttrs.reset();
285 7333 : myImpl->myNeedsWrite = false;
286 : }
287 8309 : if (writeBatch || (myImpl->myXMLStack.empty() && !myImpl->myBuilders.empty())) {
288 : std::vector<std::shared_ptr<arrow::Array> > data;
289 589 : for (auto& builder : myImpl->myBuilders) {
290 545 : std::shared_ptr<arrow::Array> column;
291 545 : PARQUET_THROW_NOT_OK(builder->Finish(&column));
292 545 : data.push_back(column);
293 : // builder.reset();
294 : }
295 132 : auto batch = arrow::RecordBatch::Make(myImpl->mySchema, data.back()->length(), data);
296 44 : PARQUET_THROW_NOT_OK(myImpl->myParquetWriter->WriteRecordBatch(*batch));
297 44 : }
298 8309 : if (!myImpl->myXMLStack.empty()) {
299 8263 : if ((int)myImpl->myValues.size() > myImpl->myXMLStack.back()) {
300 8063 : myImpl->myValues.resize(myImpl->myXMLStack.back());
301 : }
302 : myImpl->myXMLStack.pop_back();
303 : }
304 8309 : return false;
305 : }
306 :
307 :
308 : void
309 9353 : ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
310 9353 : myImpl->checkAttr(attr);
311 9353 : if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
312 2124 : myImpl->checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
313 4248 : myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
314 : } else {
315 7229 : myImpl->checkBuilder<SumoXMLAttr, arrow::FloatBuilder>(attr, arrow::float32);
316 14458 : myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
317 : }
318 9353 : }
319 :
320 :
321 : void
322 713 : ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
323 713 : myImpl->checkAttr(attr);
324 713 : myImpl->checkBuilder<SumoXMLAttr, arrow::Int32Builder>(attr, arrow::int32);
325 713 : myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
326 713 : }
327 :
328 :
329 : void
330 7105 : ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val, const bool isNull) {
331 : assert(!myImpl->myCheckColumns);
332 7105 : if (into.precision() > 2) {
333 0 : myImpl->checkBuilder<std::string, arrow::DoubleBuilder>(attr, arrow::float64);
334 0 : myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
335 : } else {
336 7105 : myImpl->checkBuilder<std::string, arrow::FloatBuilder>(attr, arrow::float32);
337 14210 : myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
338 : }
339 7105 : }
340 :
341 :
342 : void
343 23108 : ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val, const bool isNull) {
344 : assert(!myImpl->myCheckColumns);
345 23108 : myImpl->checkBuilder<std::string, arrow::Int32Builder>(attr, arrow::int32);
346 23108 : myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
347 23108 : }
348 :
349 :
350 : void
351 6890 : ParquetFormatter::writeStringAttr(const SumoXMLAttr attr, const std::string& val) {
352 6890 : myImpl->checkAttr(attr);
353 6890 : myImpl->checkBuilder<SumoXMLAttr, arrow::StringBuilder>(attr, arrow::utf8);
354 6890 : myImpl->myValues.push_back(std::make_shared<arrow::StringScalar>(val));
355 6890 : }
356 :
357 :
358 : void
359 22531 : ParquetFormatter::writeStringAttr(const std::string& attr, const std::string& val) {
360 : assert(!myImpl->myCheckColumns);
361 22531 : myImpl->checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
362 22531 : myImpl->myValues.push_back(std::make_shared<arrow::StringScalar>(val));
363 22531 : }
364 :
365 :
366 : void
367 948 : ParquetFormatter::writeNullAttr(const SumoXMLAttr attr) {
368 948 : myImpl->checkAttr(attr);
369 948 : myImpl->checkBuilder<SumoXMLAttr, arrow::StringBuilder>(attr, arrow::utf8);
370 948 : myImpl->myValues.push_back(nullptr);
371 948 : }
372 :
373 :
374 : void
375 432 : ParquetFormatter::writeNullAttr(const std::string& attr) {
376 : assert(!myImpl->myCheckColumns);
377 432 : myImpl->checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
378 432 : myImpl->myValues.push_back(nullptr);
379 432 : }
380 :
381 :
382 : void
383 1156 : ParquetFormatter::writeTime(std::ostream& /* into */, const SumoXMLAttr attr, const SUMOTime val) {
384 1156 : if (!gHumanReadableTime) {
385 : // always float64 for machine-readable time, regardless of stream precision
386 1156 : myImpl->checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
387 1156 : myImpl->myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
388 1156 : return;
389 : }
390 0 : writeStringAttr(attr, time2string(val));
391 : }
392 :
393 :
394 : bool
395 0 : ParquetFormatter::wroteHeader() const {
396 0 : return myImpl->myWroteHeader;
397 : }
398 :
399 :
400 : void
401 58 : ParquetFormatter::setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth) {
402 58 : myImpl->myExpectedAttrs = expected;
403 58 : myImpl->myMaxDepth = depth;
404 58 : myImpl->myCheckColumns = expected.any();
405 58 : }
406 :
407 :
408 : /****************************************************************************/
|