Line data Source code
1 : /****************************************************************************/
2 : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 : // Copyright (C) 2012-2026 German Aerospace Center (DLR) and others.
4 : // This program and the accompanying materials are made available under the
5 : // terms of the Eclipse Public License 2.0 which is available at
6 : // https://www.eclipse.org/legal/epl-2.0/
7 : // This Source Code may also be made available under the following Secondary
8 : // Licenses when the conditions for such availability set forth in the Eclipse
9 : // Public License 2.0 are satisfied: GNU General Public License, version 2
10 : // or later which is available at
11 : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 : /****************************************************************************/
14 : /// @file ParquetFormatter.cpp
15 : /// @author Michael Behrisch
16 : /// @date 2025-06-17
17 : ///
18 : // An output formatter for Parquet files
19 : /****************************************************************************/
20 : #include <config.h>
21 :
22 : #ifdef _MSC_VER
23 : #pragma warning(push)
24 : /* Disable warning about changed memory layout due to virtual base class */
25 : #pragma warning(disable: 4435)
26 : #endif
27 : #include <arrow/io/api.h>
28 : #ifdef _MSC_VER
29 : #pragma warning(pop)
30 : #endif
31 :
32 : #include <utils/common/MsgHandler.h>
33 : #include <utils/common/ToString.h>
34 : #include "ParquetFormatter.h"
35 :
36 :
37 : // ===========================================================================
38 : // helper class definitions
39 : // ===========================================================================
40 : class ArrowOStreamWrapper : public arrow::io::OutputStream {
41 : public:
42 : ArrowOStreamWrapper(std::ostream& out)
43 24 : : myOStream(out), myAmOpen(true) {}
44 :
45 0 : arrow::Status Close() override {
46 0 : myAmOpen = false;
47 0 : return arrow::Status::OK();
48 : }
49 :
50 0 : arrow::Status Flush() override {
51 0 : myOStream.flush();
52 0 : return arrow::Status::OK();
53 : }
54 :
55 1177 : arrow::Result<int64_t> Tell() const override {
56 1177 : return myOStream.tellp();
57 : }
58 :
59 0 : bool closed() const override {
60 0 : return !myAmOpen;
61 : }
62 :
63 759 : arrow::Status Write(const void* data, int64_t nbytes) override {
64 759 : if (!myAmOpen) {
65 : return arrow::Status::IOError("Write on closed stream");
66 : }
67 759 : myOStream.write(reinterpret_cast<const char*>(data), nbytes);
68 759 : if (!myOStream) {
69 : return arrow::Status::IOError("Failed to write to ostream");
70 : }
71 : return arrow::Status::OK();
72 : }
73 :
74 : private:
75 : std::ostream& myOStream;
76 : bool myAmOpen;
77 : };
78 :
79 :
80 : // ===========================================================================
81 : // member method definitions
82 : // ===========================================================================
83 24 : ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
84 48 : : OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {
85 24 : if (compression == "snappy") {
86 0 : myCompression = parquet::Compression::SNAPPY;
87 24 : } else if (compression == "gzip") {
88 0 : myCompression = parquet::Compression::GZIP;
89 24 : } else if (compression == "brotli") {
90 0 : myCompression = parquet::Compression::BROTLI;
91 24 : } else if (compression == "zstd") {
92 0 : myCompression = parquet::Compression::ZSTD;
93 24 : } else if (compression == "lz4") {
94 0 : myCompression = parquet::Compression::LZ4;
95 24 : } else if (compression == "bz2") {
96 0 : myCompression = parquet::Compression::BZ2;
97 24 : } else if (compression != "" && compression != "uncompressed") {
98 0 : WRITE_ERRORF("Unknown compression: %", compression);
99 : }
100 24 : if (!arrow::util::Codec::IsAvailable(myCompression)) {
101 0 : WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
102 0 : myCompression = parquet::Compression::UNCOMPRESSED;
103 : }
104 24 : }
105 :
106 : void
107 756 : ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
108 756 : myXMLStack.push_back((int)myValues.size());
109 756 : if (!myWroteHeader) {
110 624 : myCurrentTag = xmlElement;
111 : }
112 756 : if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {
113 0 : WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);
114 : }
115 756 : }
116 :
117 :
118 : void
119 744 : ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
120 744 : myXMLStack.push_back((int)myValues.size());
121 744 : if (!myWroteHeader) {
122 48 : myCurrentTag = toString(xmlElement);
123 : }
124 1464 : if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {
125 36 : WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));
126 : }
127 744 : }
128 :
129 :
130 : bool
131 1524 : ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
132 1524 : if (myMaxDepth == 0) {
133 : // the auto detection case: the first closed tag determines the depth
134 6 : myMaxDepth = (int)myXMLStack.size();
135 : }
136 1524 : if ((myMaxDepth == (int)myXMLStack.size() || myXMLStack.empty()) && !myWroteHeader) {
137 : // we are at the correct depth or the document has ended (XML stack is empty)
138 : // so we should initialize the writer with the schema (if not done yet)
139 24 : if (!myCheckColumns) {
140 12 : WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
141 : }
142 : auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
143 24 : std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();
144 96 : myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);
145 24 : myWroteHeader = true;
146 : }
147 : bool writeBatch = false;
148 1524 : if (myNeedsWrite) {
149 2076 : if (myCheckColumns && (int)myXMLStack.size() == myMaxDepth && myExpectedAttrs != mySeenAttrs) {
150 1552 : for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {
151 1536 : if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {
152 36 : WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
153 : toString((SumoXMLAttr)i));
154 : }
155 : }
156 : }
157 : int index = 0;
158 9956 : for (auto& builder : myBuilders) {
159 8612 : const auto val = index < (int)myValues.size() ? myValues[index++] : nullptr;
160 16494 : PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
161 : }
162 1344 : writeBatch = myWroteHeader && myBuilders.back()->length() >= myBatchSize;
163 : mySeenAttrs.reset();
164 1344 : myNeedsWrite = false;
165 : }
166 1524 : if (writeBatch || (myXMLStack.empty() && !myBuilders.empty())) {
167 : std::vector<std::shared_ptr<arrow::Array> > data;
168 245 : for (auto& builder : myBuilders) {
169 221 : std::shared_ptr<arrow::Array> column;
170 221 : PARQUET_THROW_NOT_OK(builder->Finish(&column));
171 221 : data.push_back(column);
172 : // builder.reset();
173 : }
174 72 : auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);
175 24 : PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));
176 24 : }
177 1524 : if (!myXMLStack.empty()) {
178 1500 : if ((int)myValues.size() > myXMLStack.back()) {
179 1500 : myValues.resize(myXMLStack.back());
180 : }
181 : myXMLStack.pop_back();
182 : }
183 1524 : return false;
184 : }
185 :
186 :
187 : /****************************************************************************/
|