Line data Source code
1 : /****************************************************************************/
2 : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 : // Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4 : // This program and the accompanying materials are made available under the
5 : // terms of the Eclipse Public License 2.0 which is available at
6 : // https://www.eclipse.org/legal/epl-2.0/
7 : // This Source Code may also be made available under the following Secondary
8 : // Licenses when the conditions for such availability set forth in the Eclipse
9 : // Public License 2.0 are satisfied: GNU General Public License, version 2
10 : // or later which is available at
11 : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 : /****************************************************************************/
14 : /// @file ParquetFormatter.h
15 : /// @author Michael Behrisch
16 : /// @date 2025-06-17
17 : ///
18 : // Output formatter for Parquet output
19 : /****************************************************************************/
20 : #pragma once
21 : #include <config.h>
22 :
23 : #include <ostream>
24 :
25 : #ifdef _MSC_VER
26 : /* Disable warning about unmatched push / pop.
27 : TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
28 : #pragma warning(suppress: 5032)
29 : #pragma warning(push)
30 : /* Disable warning about unused parameters */
31 : #pragma warning(disable: 4100)
32 : /* Disable warning about hidden function arrow::io::Writable::Write */
33 : #pragma warning(disable: 4266)
34 : /* Disable warning about padded memory layout */
35 : #pragma warning(disable: 4324)
36 : /* Disable warning about this in initializers */
37 : #pragma warning(disable: 4355)
38 : /* Disable warning about changed memory layout due to virtual base class */
39 : #pragma warning(disable: 4435)
40 : /* Disable warning about declaration hiding class member */
41 : #pragma warning(disable: 4458)
42 : /* Disable warning about implicit conversion of int to bool */
43 : #pragma warning(disable: 4800)
44 : #endif
45 : #include <arrow/api.h>
46 : #include <parquet/arrow/writer.h>
47 : #ifdef _MSC_VER
48 : /* Disable warning about unmatched push / pop.
49 : TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
50 : #pragma warning(suppress: 5031)
51 : #pragma warning(pop)
52 : #endif
53 :
54 : #include <utils/common/ToString.h>
55 : #include "OutputFormatter.h"
56 :
57 :
58 : // ===========================================================================
59 : // class definitions
60 : // ===========================================================================
61 : /**
62 : * @class ParquetFormatter
63 : * @brief Output formatter for Parquet output
64 : */
65 : class ParquetFormatter : public OutputFormatter {
66 : public:
67 : /// @brief Constructor
68 : // for some motivation on the default batch size see https://stackoverflow.com/questions/76782018/what-is-actually-meant-when-referring-to-parquet-row-group-size
69 : ParquetFormatter(const std::string& columnNames, const std::string& compression = "", const int batchSize = 1000000);
70 :
71 : /// @brief Destructor
72 72 : virtual ~ParquetFormatter() { }
73 :
74 : /** @brief Keeps track of an open XML tag by adding a new element to the stack
75 : *
76 : * @param[in] into The output stream to use (unused)
77 : * @param[in] xmlElement Name of element to open (unused)
78 : * @return The OutputDevice for further processing
79 : */
80 : void openTag(std::ostream& into, const std::string& xmlElement);
81 :
82 : /** @brief Keeps track of an open XML tag by adding a new element to the stack
83 : *
84 : * @param[in] into The output stream to use (unused)
85 : * @param[in] xmlElement Name of element to open (unused)
86 : */
87 : void openTag(std::ostream& into, const SumoXMLTag& xmlElement);
88 :
89 : /** @brief Closes the most recently opened tag
90 : *
91 : * @param[in] into The output stream to use
92 : * @return Whether a further element existed in the stack and could be closed
93 : * @todo it is not verified that the topmost element was closed
94 : */
95 : bool closeTag(std::ostream& into, const std::string& comment = "");
96 :
97 : /** @brief writes a named attribute
98 : *
99 : * @param[in] attr The attribute (name)
100 : * @param[in] val The attribute value
101 : * @param[in] isNull The given value is not set
102 : */
103 : template <class T>
104 2904 : void writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const T& val, const bool isNull = false) {
105 2904 : checkAttr(attr);
106 2904 : if (!myWroteHeader) {
107 312 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::utf8()));
108 156 : myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
109 : }
110 5808 : myValues.push_back(isNull ? nullptr : std::make_shared<arrow::StringScalar>(toString(val)));
111 2904 : }
112 :
113 : template <class T>
114 0 : void writeAttr(std::ostream& /* into */, const std::string& attr, const T& val) {
115 : assert(!myCheckColumns);
116 0 : if (!myWroteHeader) {
117 0 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::utf8()));
118 0 : myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
119 : }
120 0 : myValues.push_back(std::make_shared<arrow::StringScalar>(toString(val)));
121 0 : }
122 :
123 756 : void writeTime(std::ostream& into, const SumoXMLAttr attr, const SUMOTime val) {
124 756 : if (!gHumanReadableTime) {
125 756 : if (!myWroteHeader) {
126 2496 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
127 1248 : myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
128 : }
129 756 : myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
130 756 : return;
131 : }
132 0 : writeAttr(into, attr, time2string(val));
133 : }
134 :
135 0 : bool wroteHeader() const {
136 0 : return myWroteHeader;
137 : }
138 :
139 18 : void setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth = 2) {
140 18 : myExpectedAttrs = expected;
141 18 : myMaxDepth = depth;
142 18 : myCheckColumns = expected.any();
143 18 : }
144 :
145 : private:
146 821 : inline const std::string getAttrString(const std::string& attrString) {
147 821 : if (myHeaderFormat == "plain") {
148 : return attrString;
149 : }
150 821 : if (myHeaderFormat == "auto") {
151 0 : for (const auto& field : mySchema->fields()) {
152 0 : if (field->name() == attrString) {
153 0 : return myCurrentTag + "_" + attrString;
154 : }
155 : }
156 : return attrString;
157 : }
158 1642 : return myCurrentTag + "_" + attrString;
159 : }
160 :
161 7258 : inline void checkAttr(const SumoXMLAttr attr) {
162 7258 : if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
163 7138 : mySeenAttrs.set(attr);
164 7138 : if (!myExpectedAttrs.test(attr)) {
165 0 : throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
166 : }
167 : }
168 7258 : }
169 :
170 : /// @brief the format to use for the column names
171 : const std::string myHeaderFormat;
172 :
173 : /// @brief the compression to use
174 : parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
175 :
176 : /// @brief the number of rows to write per batch
177 : const int myBatchSize;
178 :
179 : /// @brief the currently read tag (only valid when generating the header)
180 : std::string myCurrentTag;
181 :
182 : /// @brief the table schema
183 : std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
184 :
185 : /// @brief the output stream writer
186 : std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
187 :
188 : /// @brief the content array builders for the table
189 : std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
190 :
191 : /// @brief The number of attributes in the currently open XML elements
192 : std::vector<int> myXMLStack;
193 :
194 : /// @brief the current attribute / column values
195 : std::vector<std::shared_ptr<arrow::Scalar> > myValues;
196 :
197 : /// @brief the maximum depth of the XML hierarchy
198 : int myMaxDepth = 0;
199 :
200 : /// @brief whether the schema has been constructed completely
201 : bool myWroteHeader = false;
202 :
203 : /// @brief whether the columns should be checked for completeness
204 : bool myCheckColumns = false;
205 :
206 : /// @brief the attributes which are expected for a complete row (including null values)
207 : SumoXMLAttrMask myExpectedAttrs;
208 :
209 : /// @brief the attributes already seen (including null values)
210 : SumoXMLAttrMask mySeenAttrs;
211 : };
212 :
213 :
214 : // ===========================================================================
215 : // specialized template implementations
216 : // ===========================================================================
217 : template <>
218 4354 : inline void ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
219 4354 : checkAttr(attr);
220 4354 : if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
221 1440 : if (!myWroteHeader) {
222 144 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
223 72 : myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
224 : }
225 2880 : myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
226 : } else {
227 2914 : if (!myWroteHeader) {
228 332 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float32()));
229 166 : myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
230 : }
231 5828 : myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
232 : }
233 4354 : }
234 :
235 : template <>
236 0 : inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
237 0 : checkAttr(attr);
238 0 : if (!myWroteHeader) {
239 0 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::int32()));
240 0 : myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
241 : }
242 0 : myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
243 0 : }
244 :
245 : template <>
246 0 : inline void ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val) {
247 : assert(!myCheckColumns);
248 0 : if (into.precision() > 2) {
249 0 : if (!myWroteHeader) {
250 0 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float64()));
251 0 : myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
252 : }
253 0 : myValues.push_back(std::make_shared<arrow::DoubleScalar>(val));
254 : } else {
255 0 : if (!myWroteHeader) {
256 0 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float32()));
257 0 : myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
258 : }
259 0 : myValues.push_back(std::make_shared<arrow::FloatScalar>((float)val));
260 : }
261 0 : }
262 :
263 : template <>
264 0 : inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val) {
265 : assert(!myCheckColumns);
266 0 : if (!myWroteHeader) {
267 0 : mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::int32()));
268 0 : myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
269 : }
270 0 : myValues.push_back(std::make_shared<arrow::Int32Scalar>(val));
271 0 : }
|