Line data Source code
1 : /****************************************************************************/
2 : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3 : // Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4 : // This program and the accompanying materials are made available under the
5 : // terms of the Eclipse Public License 2.0 which is available at
6 : // https://www.eclipse.org/legal/epl-2.0/
7 : // This Source Code may also be made available under the following Secondary
8 : // Licenses when the conditions for such availability set forth in the Eclipse
9 : // Public License 2.0 are satisfied: GNU General Public License, version 2
10 : // or later which is available at
11 : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 : /****************************************************************************/
14 : /// @file ParquetFormatter.cpp
15 : /// @author Michael Behrisch
16 : /// @date 2025-06-17
17 : ///
18 : // An output formatter for Parquet files
19 : /****************************************************************************/
20 : #include <config.h>
21 :
22 : #ifdef _MSC_VER
23 : #pragma warning(push)
24 : /* Disable warning about changed memory layout due to virtual base class */
25 : #pragma warning(disable: 4435)
26 : #endif
27 : #include <arrow/io/api.h>
28 : #ifdef _MSC_VER
29 : #pragma warning(pop)
30 : #endif
31 :
32 : #include <utils/common/MsgHandler.h>
33 : #include <utils/common/ToString.h>
34 : #include "ParquetFormatter.h"
35 :
36 :
37 : // ===========================================================================
38 : // helper class definitions
39 : // ===========================================================================
40 : class ArrowOStreamWrapper : public arrow::io::OutputStream {
41 : public:
42 : ArrowOStreamWrapper(std::ostream& out)
43 24 : : myOStream(out), myAmOpen(true) {}
44 :
45 0 : arrow::Status Close() override {
46 0 : myAmOpen = false;
47 0 : return arrow::Status::OK();
48 : }
49 :
50 0 : arrow::Status Flush() override {
51 0 : myOStream.flush();
52 0 : return arrow::Status::OK();
53 : }
54 :
55 1177 : arrow::Result<int64_t> Tell() const override {
56 1177 : return myOStream.tellp();
57 : }
58 :
59 0 : bool closed() const override {
60 0 : return !myAmOpen;
61 : }
62 :
63 759 : arrow::Status Write(const void* data, int64_t nbytes) override {
64 759 : if (!myAmOpen) {
65 : return arrow::Status::IOError("Write on closed stream");
66 : }
67 759 : myOStream.write(reinterpret_cast<const char*>(data), nbytes);
68 759 : if (!myOStream) {
69 : return arrow::Status::IOError("Failed to write to ostream");
70 : }
71 : return arrow::Status::OK();
72 : }
73 :
74 : private:
75 : std::ostream& myOStream;
76 : bool myAmOpen;
77 : };
78 :
79 :
80 : // ===========================================================================
81 : // member method definitions
82 : // ===========================================================================
83 24 : ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
84 48 : : OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {
85 24 : if (compression == "snappy") {
86 0 : myCompression = parquet::Compression::SNAPPY;
87 24 : } else if (compression == "gzip") {
88 0 : myCompression = parquet::Compression::GZIP;
89 24 : } else if (compression == "brotli") {
90 0 : myCompression = parquet::Compression::BROTLI;
91 24 : } else if (compression == "zstd") {
92 0 : myCompression = parquet::Compression::ZSTD;
93 24 : } else if (compression == "lz4") {
94 0 : myCompression = parquet::Compression::LZ4;
95 24 : } else if (compression == "bz2") {
96 0 : myCompression = parquet::Compression::BZ2;
97 24 : } else if (compression != "" && compression != "uncompressed") {
98 0 : WRITE_ERRORF("Unknown compression: %", compression);
99 : }
100 24 : if (!arrow::util::Codec::IsAvailable(myCompression)) {
101 0 : WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
102 0 : myCompression = parquet::Compression::UNCOMPRESSED;
103 : }
104 24 : }
105 :
106 : void
107 756 : ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
108 756 : myXMLStack.push_back((int)myValues.size());
109 756 : if (!myWroteHeader) {
110 624 : myCurrentTag = xmlElement;
111 : }
112 756 : if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {
113 0 : WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);
114 : }
115 756 : }
116 :
117 :
118 : void
119 744 : ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
120 744 : myXMLStack.push_back((int)myValues.size());
121 744 : if (!myWroteHeader) {
122 48 : myCurrentTag = toString(xmlElement);
123 : }
124 1464 : if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {
125 36 : WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));
126 : }
127 744 : }
128 :
129 :
130 : bool
131 1524 : ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
132 1524 : if (myMaxDepth == 0) {
133 6 : myMaxDepth = (int)myXMLStack.size();
134 : }
135 1524 : if (myMaxDepth == (int)myXMLStack.size() && !myWroteHeader) {
136 24 : if (!myCheckColumns) {
137 12 : WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
138 : }
139 : auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
140 24 : std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();
141 96 : myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);
142 24 : myWroteHeader = true;
143 : }
144 : bool writeBatch = false;
145 1524 : if ((int)myXMLStack.size() == myMaxDepth) {
146 1476 : if (myCheckColumns && myExpectedAttrs != mySeenAttrs) {
147 1552 : for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {
148 1536 : if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {
149 36 : WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
150 : toString((SumoXMLAttr)i));
151 36 : myValues.push_back(nullptr);
152 : }
153 : }
154 : }
155 : int index = 0;
156 8756 : for (auto& builder : myBuilders) {
157 8012 : const auto val = myValues[index++];
158 15294 : PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
159 : }
160 744 : writeBatch = myBuilders.back()->length() == myBatchSize;
161 : mySeenAttrs.reset();
162 : }
163 1524 : if (writeBatch || myXMLStack.empty()) {
164 : std::vector<std::shared_ptr<arrow::Array> > data;
165 245 : for (auto& builder : myBuilders) {
166 221 : std::shared_ptr<arrow::Array> column;
167 221 : PARQUET_THROW_NOT_OK(builder->Finish(&column));
168 221 : data.push_back(column);
169 : // builder.reset();
170 : }
171 72 : auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);
172 24 : PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));
173 24 : }
174 1524 : if (!myXMLStack.empty()) {
175 9532 : while ((int)myValues.size() > myXMLStack.back()) {
176 8032 : if (!myWroteHeader) {
177 1200 : mySchema = *mySchema->RemoveField(mySchema->num_fields() - 1);
178 : myBuilders.pop_back();
179 : }
180 : myValues.pop_back();
181 : }
182 : myXMLStack.pop_back();
183 : }
184 1524 : return false;
185 : }
186 :
187 :
188 : /****************************************************************************/
|