Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
ParquetFormatter.cpp
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// An output formatter for Parquet files
19/****************************************************************************/
20#include <config.h>
21
22#ifdef _MSC_VER
23#pragma warning(push)
24/* Disable warning about changed memory layout due to virtual base class */
25#pragma warning(disable: 4435)
26#endif
27#include <arrow/io/api.h>
28#ifdef _MSC_VER
29#pragma warning(pop)
30#endif
31
34#include "ParquetFormatter.h"
35
36
37// ===========================================================================
38// helper class definitions
39// ===========================================================================
40class ArrowOStreamWrapper : public arrow::io::OutputStream {
41public:
42 ArrowOStreamWrapper(std::ostream& out)
43 : myOStream(out), myAmOpen(true) {}
44
45 arrow::Status Close() override {
46 myAmOpen = false;
47 return arrow::Status::OK();
48 }
49
50 arrow::Status Flush() override {
51 myOStream.flush();
52 return arrow::Status::OK();
53 }
54
55 arrow::Result<int64_t> Tell() const override {
56 return myOStream.tellp();
57 }
58
59 bool closed() const override {
60 return !myAmOpen;
61 }
62
63 arrow::Status Write(const void* data, int64_t nbytes) override {
64 if (!myAmOpen) {
65 return arrow::Status::IOError("Write on closed stream");
66 }
67 myOStream.write(reinterpret_cast<const char*>(data), nbytes);
68 if (!myOStream) {
69 return arrow::Status::IOError("Failed to write to ostream");
70 }
71 return arrow::Status::OK();
72 }
73
74private:
75 std::ostream& myOStream;
77};
78
79
80// ===========================================================================
81// member method definitions
82// ===========================================================================
83ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
84 : OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {
85 if (compression == "snappy") {
86 myCompression = parquet::Compression::SNAPPY;
87 } else if (compression == "gzip") {
88 myCompression = parquet::Compression::GZIP;
89 } else if (compression == "brotli") {
90 myCompression = parquet::Compression::BROTLI;
91 } else if (compression == "zstd") {
92 myCompression = parquet::Compression::ZSTD;
93 } else if (compression == "lz4") {
94 myCompression = parquet::Compression::LZ4;
95 } else if (compression == "bz2") {
96 myCompression = parquet::Compression::BZ2;
97 } else if (compression != "" && compression != "uncompressed") {
98 WRITE_ERRORF("Unknown compression: %", compression);
99 }
100 if (!arrow::util::Codec::IsAvailable(myCompression)) {
101 WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
102 myCompression = parquet::Compression::UNCOMPRESSED;
103 }
104}
105
106void
107ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
108 myXMLStack.push_back((int)myValues.size());
109 if (!myWroteHeader) {
110 myCurrentTag = xmlElement;
111 }
112 if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {
113 WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);
114 }
115}
116
117
118void
119ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
120 myXMLStack.push_back((int)myValues.size());
121 if (!myWroteHeader) {
122 myCurrentTag = toString(xmlElement);
123 }
124 if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {
125 WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));
126 }
127}
128
129
130bool
131ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
132 if (myMaxDepth == 0) {
133 myMaxDepth = (int)myXMLStack.size();
134 }
135 if (myMaxDepth == (int)myXMLStack.size() && !myWroteHeader) {
136 if (!myCheckColumns) {
137 WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
138 }
139 auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
140 std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();
141 myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);
142 myWroteHeader = true;
143 }
144 bool writeBatch = false;
145 if ((int)myXMLStack.size() == myMaxDepth) {
147 for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {
148 if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {
149 WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
151 myValues.push_back(nullptr);
152 }
153 }
154 }
155 int index = 0;
156 for (auto& builder : myBuilders) {
157 const auto val = myValues[index++];
158 PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
159 }
160 writeBatch = myBuilders.back()->length() == myBatchSize;
161 mySeenAttrs.reset();
162 }
163 if (writeBatch || myXMLStack.empty()) {
164 std::vector<std::shared_ptr<arrow::Array> > data;
165 for (auto& builder : myBuilders) {
166 std::shared_ptr<arrow::Array> column;
167 PARQUET_THROW_NOT_OK(builder->Finish(&column));
168 data.push_back(column);
169 // builder.reset();
170 }
171 auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);
172 PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));
173 }
174 if (!myXMLStack.empty()) {
175 while ((int)myValues.size() > myXMLStack.back()) {
176 if (!myWroteHeader) {
177 mySchema = *mySchema->RemoveField(mySchema->num_fields() - 1);
178 myBuilders.pop_back();
179 }
180 myValues.pop_back();
181 }
182 myXMLStack.pop_back();
183 }
184 return false;
185}
186
187
188/****************************************************************************/
#define WRITE_WARNINGF(...)
Definition MsgHandler.h:287
#define WRITE_ERRORF(...)
Definition MsgHandler.h:296
#define WRITE_WARNING(msg)
Definition MsgHandler.h:286
OutputFormatterType
SumoXMLTag
Numbers representing SUMO-XML - element names.
SumoXMLAttr
Numbers representing SUMO-XML - attributes.
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition ToString.h:46
bool closed() const override
arrow::Status Close() override
arrow::Status Flush() override
arrow::Status Write(const void *data, int64_t nbytes) override
ArrowOStreamWrapper(std::ostream &out)
arrow::Result< int64_t > Tell() const override
Abstract base class for output formatters.
bool myCheckColumns
whether the columns should be checked for completeness
ParquetFormatter(const std::string &columnNames, const std::string &compression="", const int batchSize=1000000)
Constructor.
SumoXMLAttrMask myExpectedAttrs
the attributes which are expected for a complete row (including null values)
void openTag(std::ostream &into, const std::string &xmlElement)
Keeps track of an open XML tag by adding a new element to the stack.
std::shared_ptr< arrow::Schema > mySchema
the table schema
parquet::Compression::type myCompression
the compression to use
bool myWroteHeader
whether the schema has been constructed completely
std::vector< int > myXMLStack
The number of attributes in the currently open XML elements.
SumoXMLAttrMask mySeenAttrs
the attributes already seen (including null values)
std::vector< std::shared_ptr< arrow::ArrayBuilder > > myBuilders
the content array builders for the table
std::string myCurrentTag
the currently read tag (only valid when generating the header)
std::unique_ptr< parquet::arrow::FileWriter > myParquetWriter
the output stream writer
bool closeTag(std::ostream &into, const std::string &comment="")
Closes the most recently opened tag.
std::vector< std::shared_ptr< arrow::Scalar > > myValues
the current attribute / column values
const int myBatchSize
the number of rows to write per batch
int myMaxDepth
the maximum depth of the XML hierarchy