Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
ParquetFormatter.h
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2012-2026 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// Output formatter for Parquet output
19/****************************************************************************/
20#pragma once
21#include <config.h>
22
23#include <ostream>
24
25#ifdef _MSC_VER
26/* Disable warning about unmatched push / pop.
27 TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
28#pragma warning(suppress: 5032)
29#pragma warning(push)
30/* Disable warning about unused parameters */
31#pragma warning(disable: 4100)
32/* Disable warning about hidden function arrow::io::Writable::Write */
33#pragma warning(disable: 4266)
34/* Disable warning about padded memory layout */
35#pragma warning(disable: 4324)
36/* Disable warning about this in initializers */
37#pragma warning(disable: 4355)
38/* Disable warning about changed memory layout due to virtual base class */
39#pragma warning(disable: 4435)
40/* Disable warning about declaration hiding class member */
41#pragma warning(disable: 4458)
42/* Disable warning about implicit conversion of int to bool */
43#pragma warning(disable: 4800)
44#endif
45#include <arrow/api.h>
46#include <parquet/arrow/writer.h>
47#ifdef _MSC_VER
48/* Disable warning about unmatched push / pop.
49 TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
50#pragma warning(suppress: 5031)
51#pragma warning(pop)
52#endif
53
55#include "OutputFormatter.h"
56
57
58// ===========================================================================
59// class definitions
60// ===========================================================================
66public:
68 // for some motivation on the default batch size see https://stackoverflow.com/questions/76782018/what-is-actually-meant-when-referring-to-parquet-row-group-size
69 ParquetFormatter(const std::string& columnNames, const std::string& compression = "", const int batchSize = 1000000);
70
72 virtual ~ParquetFormatter() { }
73
80 void openTag(std::ostream& into, const std::string& xmlElement);
81
87 void openTag(std::ostream& into, const SumoXMLTag& xmlElement);
88
95 bool closeTag(std::ostream& into, const std::string& comment = "");
96
103 template <class T>
104 void writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const T& val, const bool isNull = false) {
105 checkAttr(attr);
106 checkBuilder<SumoXMLAttr, arrow::StringBuilder>(attr, arrow::utf8);
107 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::StringScalar>(toString(val)));
108 }
109
110 template <class T>
111 void writeAttr(std::ostream& /* into */, const std::string& attr, const T& val) {
112 assert(!myCheckColumns);
113 checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
114 myValues.push_back(std::make_shared<arrow::StringScalar>(toString(val)));
115 }
116
117 void writeTime(std::ostream& into, const SumoXMLAttr attr, const SUMOTime val) {
118 if (!gHumanReadableTime) {
119 checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
120 myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
121 return;
122 }
123 writeAttr(into, attr, time2string(val));
124 }
125
126 bool wroteHeader() const {
127 return myWroteHeader;
128 }
129
130 void setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth = 2) {
131 myExpectedAttrs = expected;
132 myMaxDepth = depth;
133 myCheckColumns = expected.any();
134 }
135
136private:
137 inline const std::string getAttrString(const std::string& attrString) {
138 if (myHeaderFormat == "plain") {
139 return attrString;
140 }
141 if (myHeaderFormat == "auto") {
142 for (const auto& field : mySchema->fields()) {
143 if (field->name() == attrString) {
144 return myCurrentTag + "_" + attrString;
145 }
146 }
147 return attrString;
148 }
149 return myCurrentTag + "_" + attrString;
150 }
151
152 inline void checkAttr(const SumoXMLAttr attr) {
153 if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
154 mySeenAttrs.set(attr);
155 if (!myExpectedAttrs.test(attr)) {
156 throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
157 }
158 }
159 }
160
161 template <class ATTR_TYPE, class BUILDER>
162 inline void checkBuilder(const ATTR_TYPE& attr, const std::shared_ptr<arrow::DataType>& (*dataType)()) {
163 myNeedsWrite = true;
164 if (!myWroteHeader) {
165 const std::string fieldName = getAttrString(toString(attr));
166 for (const auto& field : mySchema->fields()) {
167 if (field->name() == fieldName) {
168 return;
169 }
170 }
171 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(fieldName, dataType()));
172 auto builder = std::make_shared<BUILDER>();
173 if (!myBuilders.empty()) {
174 if (myBuilders.back()->length() > 0) {
175 PARQUET_THROW_NOT_OK(builder->AppendNulls(myBuilders.back()->length()));
176 }
177 while (myValues.size() < myBuilders.size()) {
178 myValues.push_back(nullptr);
179 }
180 }
181 myBuilders.push_back(builder);
182 }
183 }
184
186 const std::string myHeaderFormat;
187
189 parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
190
192 const int myBatchSize;
193
195 std::string myCurrentTag;
196
198 std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
199
201 std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
202
204 std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
205
207 std::vector<int> myXMLStack;
208
210 std::vector<std::shared_ptr<arrow::Scalar> > myValues;
211
213 int myMaxDepth = 2;
214
216 bool myWroteHeader = false;
217
219 bool myCheckColumns = false;
220
222 bool myNeedsWrite = false;
223
226
229};
230
231
232// ===========================================================================
233// specialized template implementations
234// ===========================================================================
235template <>
236inline void ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
237 checkAttr(attr);
238 if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
239 checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
240 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
241 } else {
242 checkBuilder<SumoXMLAttr, arrow::FloatBuilder>(attr, arrow::float32);
243 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
244 }
245}
246
247template <>
248inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
249 checkAttr(attr);
250 checkBuilder<SumoXMLAttr, arrow::Int32Builder>(attr, arrow::int32);
251 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
252}
253
254template <>
255inline void ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val) {
256 assert(!myCheckColumns);
257 if (into.precision() > 2) {
258 checkBuilder<std::string, arrow::DoubleBuilder>(attr, arrow::float64);
259 myValues.push_back(std::make_shared<arrow::DoubleScalar>(val));
260 } else {
261 checkBuilder<std::string, arrow::FloatBuilder>(attr, arrow::float32);
262 myValues.push_back(std::make_shared<arrow::FloatScalar>((float)val));
263 }
264}
265
266template <>
267inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val) {
268 assert(!myCheckColumns);
269 checkBuilder<std::string, arrow::Int32Builder>(attr, arrow::int32);
270 myValues.push_back(std::make_shared<arrow::Int32Scalar>(val));
271}
long long int SUMOTime
Definition GUI.h:36
#define TLF(string,...)
Definition MsgHandler.h:306
std::string time2string(SUMOTime t, bool humanReadable)
convert SUMOTime to string (independently of global format setting)
Definition SUMOTime.cpp:91
#define STEPS2TIME(x)
Definition SUMOTime.h:58
SumoXMLTag
Numbers representing SUMO-XML - element names.
std::bitset< 96 > SumoXMLAttrMask
SumoXMLAttr
Numbers representing SUMO-XML - attributes.
@ SUMO_ATTR_Y
@ SUMO_ATTR_X
bool gHumanReadableTime
Definition StdDefs.cpp:31
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition ToString.h:46
Abstract base class for output formatters.
Output formatter for Parquet output.
bool myCheckColumns
whether the columns should be checked for completeness
virtual ~ParquetFormatter()
Destructor.
const std::string getAttrString(const std::string &attrString)
void setExpectedAttributes(const SumoXMLAttrMask &expected, const int depth=2)
Set the expected attributes to write. This is used for tracking which attributes are expected in tabl...
SumoXMLAttrMask myExpectedAttrs
the attributes which are expected for a complete row (including null values)
void openTag(std::ostream &into, const std::string &xmlElement)
Keeps track of an open XML tag by adding a new element to the stack.
std::shared_ptr< arrow::Schema > mySchema
the table schema
void writeTime(std::ostream &into, const SumoXMLAttr attr, const SUMOTime val)
parquet::Compression::type myCompression
the compression to use
const std::string myHeaderFormat
the format to use for the column names
bool myWroteHeader
whether the schema has been constructed completely
std::vector< int > myXMLStack
The number of attributes in the currently open XML elements.
bool wroteHeader() const
Returns whether a header has been written. Useful to detect whether a file is being used by multiple ...
SumoXMLAttrMask mySeenAttrs
the attributes already seen (including null values)
void writeAttr(std::ostream &, const std::string &attr, const T &val)
std::vector< std::shared_ptr< arrow::ArrayBuilder > > myBuilders
the content array builders for the table
void checkAttr(const SumoXMLAttr attr)
std::string myCurrentTag
the currently read tag (only valid when generating the header)
bool myNeedsWrite
whether there is still unwritten data
std::unique_ptr< parquet::arrow::FileWriter > myParquetWriter
the output stream writer
bool closeTag(std::ostream &into, const std::string &comment="")
Closes the most recently opened tag.
std::vector< std::shared_ptr< arrow::Scalar > > myValues
the current attribute / column values
void checkBuilder(const ATTR_TYPE &attr, const std::shared_ptr< arrow::DataType > &(*dataType)())
const int myBatchSize
the number of rows to write per batch
int myMaxDepth
the maximum depth of the XML hierarchy
void writeAttr(std::ostream &, const SumoXMLAttr attr, const T &val, const bool isNull=false)
writes a named attribute