Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
ParquetFormatter.h
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// Output formatter for Parquet output
19/****************************************************************************/
20#pragma once
21#include <config.h>
22
23#include <ostream>
24
25#ifdef _MSC_VER
26/* Disable warning about unmatched push / pop.
27 TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
28#pragma warning(suppress: 5032)
29#pragma warning(push)
30/* Disable warning about unused parameters */
31#pragma warning(disable: 4100)
32/* Disable warning about hidden function arrow::io::Writable::Write */
33#pragma warning(disable: 4266)
34/* Disable warning about padded memory layout */
35#pragma warning(disable: 4324)
36/* Disable warning about this in initializers */
37#pragma warning(disable: 4355)
38/* Disable warning about changed memory layout due to virtual base class */
39#pragma warning(disable: 4435)
40/* Disable warning about declaration hiding class member */
41#pragma warning(disable: 4458)
42/* Disable warning about implicit conversion of int to bool */
43#pragma warning(disable: 4800)
44#endif
45#include <arrow/api.h>
46#include <parquet/arrow/writer.h>
47#ifdef _MSC_VER
48/* Disable warning about unmatched push / pop.
49 TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
50#pragma warning(suppress: 5031)
51#pragma warning(pop)
52#endif
53
55#include "OutputFormatter.h"
56
57
58// ===========================================================================
59// class definitions
60// ===========================================================================
66public:
68 // for some motivation on the default batch size see https://stackoverflow.com/questions/76782018/what-is-actually-meant-when-referring-to-parquet-row-group-size
69 ParquetFormatter(const std::string& columnNames, const std::string& compression = "", const int batchSize = 1000000);
70
72 virtual ~ParquetFormatter() { }
73
80 void openTag(std::ostream& into, const std::string& xmlElement);
81
87 void openTag(std::ostream& into, const SumoXMLTag& xmlElement);
88
95 bool closeTag(std::ostream& into, const std::string& comment = "");
96
103 template <class T>
104 void writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const T& val, const bool isNull = false) {
105 checkAttr(attr);
106 if (!myWroteHeader) {
107 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::utf8()));
108 myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
109 }
110 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::StringScalar>(toString(val)));
111 }
112
113 template <class T>
114 void writeAttr(std::ostream& /* into */, const std::string& attr, const T& val) {
115 assert(!myCheckColumns);
116 if (!myWroteHeader) {
117 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::utf8()));
118 myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
119 }
120 myValues.push_back(std::make_shared<arrow::StringScalar>(toString(val)));
121 }
122
123 void writeTime(std::ostream& into, const SumoXMLAttr attr, const SUMOTime val) {
124 if (!gHumanReadableTime) {
125 if (!myWroteHeader) {
126 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
127 myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
128 }
129 myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
130 return;
131 }
132 writeAttr(into, attr, time2string(val));
133 }
134
135 bool wroteHeader() const {
136 return myWroteHeader;
137 }
138
139 void setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth = 2) {
140 myExpectedAttrs = expected;
141 myMaxDepth = depth;
142 myCheckColumns = expected.any();
143 }
144
145private:
146 inline const std::string getAttrString(const std::string& attrString) {
147 if (myHeaderFormat == "plain") {
148 return attrString;
149 }
150 if (myHeaderFormat == "auto") {
151 for (const auto& field : mySchema->fields()) {
152 if (field->name() == attrString) {
153 return myCurrentTag + "_" + attrString;
154 }
155 }
156 return attrString;
157 }
158 return myCurrentTag + "_" + attrString;
159 }
160
161 inline void checkAttr(const SumoXMLAttr attr) {
162 if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
163 mySeenAttrs.set(attr);
164 if (!myExpectedAttrs.test(attr)) {
165 throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
166 }
167 }
168 }
169
171 const std::string myHeaderFormat;
172
174 parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
175
177 const int myBatchSize;
178
180 std::string myCurrentTag;
181
183 std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
184
186 std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
187
189 std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
190
192 std::vector<int> myXMLStack;
193
195 std::vector<std::shared_ptr<arrow::Scalar> > myValues;
196
198 int myMaxDepth = 0;
199
201 bool myWroteHeader = false;
202
204 bool myCheckColumns = false;
205
208
211};
212
213
214// ===========================================================================
215// specialized template implementations
216// ===========================================================================
217template <>
218inline void ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
219 checkAttr(attr);
220 if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
221 if (!myWroteHeader) {
222 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
223 myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
224 }
225 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
226 } else {
227 if (!myWroteHeader) {
228 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float32()));
229 myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
230 }
231 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
232 }
233}
234
235template <>
236inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
237 checkAttr(attr);
238 if (!myWroteHeader) {
239 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::int32()));
240 myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
241 }
242 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
243}
244
245template <>
246inline void ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val) {
247 assert(!myCheckColumns);
248 if (into.precision() > 2) {
249 if (!myWroteHeader) {
250 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float64()));
251 myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
252 }
253 myValues.push_back(std::make_shared<arrow::DoubleScalar>(val));
254 } else {
255 if (!myWroteHeader) {
256 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float32()));
257 myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
258 }
259 myValues.push_back(std::make_shared<arrow::FloatScalar>((float)val));
260 }
261}
262
263template <>
264inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val) {
265 assert(!myCheckColumns);
266 if (!myWroteHeader) {
267 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::int32()));
268 myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
269 }
270 myValues.push_back(std::make_shared<arrow::Int32Scalar>(val));
271}
long long int SUMOTime
Definition GUI.h:36
#define TLF(string,...)
Definition MsgHandler.h:307
std::string time2string(SUMOTime t, bool humanReadable)
convert SUMOTime to string (independently of global format setting)
Definition SUMOTime.cpp:91
#define STEPS2TIME(x)
Definition SUMOTime.h:55
SumoXMLTag
Numbers representing SUMO-XML - element names.
std::bitset< 96 > SumoXMLAttrMask
SumoXMLAttr
Numbers representing SUMO-XML - attributes.
@ SUMO_ATTR_Y
@ SUMO_ATTR_X
bool gHumanReadableTime
Definition StdDefs.cpp:30
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition ToString.h:46
Abstract base class for output formatters.
Output formatter for Parquet output.
bool myCheckColumns
whether the columns should be checked for completeness
virtual ~ParquetFormatter()
Destructor.
const std::string getAttrString(const std::string &attrString)
void setExpectedAttributes(const SumoXMLAttrMask &expected, const int depth=2)
Set the expected attributes to write. This is used for tracking which attributes are expected in tabl...
SumoXMLAttrMask myExpectedAttrs
the attributes which are expected for a complete row (including null values)
void openTag(std::ostream &into, const std::string &xmlElement)
Keeps track of an open XML tag by adding a new element to the stack.
std::shared_ptr< arrow::Schema > mySchema
the table schema
void writeTime(std::ostream &into, const SumoXMLAttr attr, const SUMOTime val)
parquet::Compression::type myCompression
the compression to use
const std::string myHeaderFormat
the format to use for the column names
bool myWroteHeader
whether the schema has been constructed completely
std::vector< int > myXMLStack
The number of attributes in the currently open XML elements.
bool wroteHeader() const
Returns whether a header has been written. Useful to detect whether a file is being used by multiple ...
SumoXMLAttrMask mySeenAttrs
the attributes already seen (including null values)
void writeAttr(std::ostream &, const std::string &attr, const T &val)
std::vector< std::shared_ptr< arrow::ArrayBuilder > > myBuilders
the content array builders for the table
void checkAttr(const SumoXMLAttr attr)
std::string myCurrentTag
the currently read tag (only valid when generating the header)
std::unique_ptr< parquet::arrow::FileWriter > myParquetWriter
the output stream writer
bool closeTag(std::ostream &into, const std::string &comment="")
Closes the most recently opened tag.
std::vector< std::shared_ptr< arrow::Scalar > > myValues
the current attribute / column values
const int myBatchSize
the number of rows to write per batch
int myMaxDepth
the maximum depth of the XML hierarchy
void writeAttr(std::ostream &, const SumoXMLAttr attr, const T &val, const bool isNull=false)
writes a named attribute