Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
ParquetFormatter.cpp
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// An output formatter for Parquet files
19/****************************************************************************/
20#include <config.h>
21
22#include <arrow/io/api.h>
23
26#include "ParquetFormatter.h"
27
28
29// ===========================================================================
30// helper class definitions
31// ===========================================================================
32class ArrowOStreamWrapper : public arrow::io::OutputStream {
33public:
34 ArrowOStreamWrapper(std::ostream& out)
35 : myOStream(out), myAmOpen(true) {}
36
37 arrow::Status Close() override {
38 myAmOpen = false;
39 return arrow::Status::OK();
40 }
41
42 arrow::Status Flush() override {
43 myOStream.flush();
44 return arrow::Status::OK();
45 }
46
47 arrow::Result<int64_t> Tell() const override {
48 return myOStream.tellp();
49 }
50
51 bool closed() const override {
52 return !myAmOpen;
53 }
54
55 arrow::Status Write(const void* data, int64_t nbytes) override {
56 if (!myAmOpen) {
57 return arrow::Status::IOError("Write on closed stream");
58 }
59 myOStream.write(reinterpret_cast<const char*>(data), nbytes);
60 if (!myOStream) {
61 return arrow::Status::IOError("Failed to write to ostream");
62 }
63 return arrow::Status::OK();
64 }
65
66private:
67 std::ostream& myOStream;
69};
70
71
72// ===========================================================================
73// member method definitions
74// ===========================================================================
75ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
76 : OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {
77 if (compression == "snappy") {
78 myCompression = parquet::Compression::SNAPPY;
79 } else if (compression == "gzip") {
80 myCompression = parquet::Compression::GZIP;
81 } else if (compression == "brotli") {
82 myCompression = parquet::Compression::BROTLI;
83 } else if (compression == "zstd") {
84 myCompression = parquet::Compression::ZSTD;
85 } else if (compression == "lz4") {
86 myCompression = parquet::Compression::LZ4;
87 } else if (compression == "bz2") {
88 myCompression = parquet::Compression::BZ2;
89 } else if (compression != "" && compression != "uncompressed") {
90 WRITE_ERRORF("Unknown compression: %", compression);
91 }
92 if (!arrow::util::Codec::IsAvailable(myCompression)) {
93 WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
94 myCompression = parquet::Compression::UNCOMPRESSED;
95 }
96}
97
98void
99ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
100 myXMLStack.push_back((int)myValues.size());
101 if (!myWroteHeader) {
102 myCurrentTag = xmlElement;
103 }
104 if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {
105 WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);
106 }
107}
108
109
110void
111ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
112 myXMLStack.push_back((int)myValues.size());
113 if (!myWroteHeader) {
114 myCurrentTag = toString(xmlElement);
115 }
116 if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {
117 WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));
118 }
119}
120
121
122bool
123ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
124 if (myMaxDepth == 0) {
125 myMaxDepth = (int)myXMLStack.size();
126 }
127 if (myMaxDepth == (int)myXMLStack.size() && !myWroteHeader) {
128 if (!myCheckColumns) {
129 WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
130 }
131 auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
132 std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();
133 myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);
134 myWroteHeader = true;
135 }
136 bool writeBatch = false;
137 if ((int)myXMLStack.size() == myMaxDepth) {
139 for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {
140 if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {
141 WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
143 }
144 }
145 }
146 int index = 0;
147 for (auto& builder : myBuilders) {
148 const auto val = myValues[index++];
149 PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
150 }
151 writeBatch = myBuilders.back()->length() == myBatchSize;
152 mySeenAttrs.reset();
153 }
154 if (writeBatch || myXMLStack.empty()) {
155 std::vector<std::shared_ptr<arrow::Array> > data;
156 for (auto& builder : myBuilders) {
157 std::shared_ptr<arrow::Array> column;
158 PARQUET_THROW_NOT_OK(builder->Finish(&column));
159 data.push_back(column);
160 // builder.reset();
161 }
162 auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);
163 PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));
164 }
165 if (!myXMLStack.empty()) {
166 while ((int)myValues.size() > myXMLStack.back()) {
167 if (!myWroteHeader) {
168 mySchema = *mySchema->RemoveField(mySchema->num_fields() - 1);
169 myBuilders.pop_back();
170 }
171 myValues.pop_back();
172 }
173 myXMLStack.pop_back();
174 }
175 return false;
176}
177
178
179/****************************************************************************/
#define WRITE_WARNINGF(...)
Definition MsgHandler.h:288
#define WRITE_ERRORF(...)
Definition MsgHandler.h:297
#define WRITE_WARNING(msg)
Definition MsgHandler.h:287
OutputFormatterType
SumoXMLTag
Numbers representing SUMO-XML - element names.
SumoXMLAttr
Numbers representing SUMO-XML - attributes.
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition ToString.h:46
bool closed() const override
arrow::Status Close() override
arrow::Status Flush() override
arrow::Status Write(const void *data, int64_t nbytes) override
ArrowOStreamWrapper(std::ostream &out)
arrow::Result< int64_t > Tell() const override
Abstract base class for output formatters.
bool myCheckColumns
whether the columns should be checked for completeness
ParquetFormatter(const std::string &columnNames, const std::string &compression="", const int batchSize=1000000)
Constructor.
SumoXMLAttrMask myExpectedAttrs
the attributes which are expected for a complete row (including null values)
void openTag(std::ostream &into, const std::string &xmlElement)
Keeps track of an open XML tag by adding a new element to the stack.
std::shared_ptr< arrow::Schema > mySchema
the table schema
parquet::Compression::type myCompression
the compression to use
bool myWroteHeader
whether the schema has been constructed completely
std::vector< int > myXMLStack
The number of attributes in the currently open XML elements.
SumoXMLAttrMask mySeenAttrs
the attributes already seen (including null values)
std::vector< std::shared_ptr< arrow::ArrayBuilder > > myBuilders
the content array builders for the table
std::string myCurrentTag
the currently read tag (only valid when generating the header)
std::unique_ptr< parquet::arrow::FileWriter > myParquetWriter
the output stream writer
bool closeTag(std::ostream &into, const std::string &comment="")
Closes the most recently opened tag.
std::vector< std::shared_ptr< arrow::Scalar > > myValues
the current attribute / column values
const int myBatchSize
the number of rows to write per batch
int myMaxDepth
the maximum depth of the XML hierarchy