LCOV - code coverage report
Current view: top level - src/utils/iodevices - ParquetFormatter.cpp (source / functions) Coverage Total Hit
Test: lcov.info Lines: 83.5 % 164 137
Test Date: 2026-06-15 15:46:12 Functions: 84.8 % 33 28

            Line data    Source code
       1              : /****************************************************************************/
       2              : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
       3              : // Copyright (C) 2012-2026 German Aerospace Center (DLR) and others.
       4              : // This program and the accompanying materials are made available under the
       5              : // terms of the Eclipse Public License 2.0 which is available at
       6              : // https://www.eclipse.org/legal/epl-2.0/
       7              : // This Source Code may also be made available under the following Secondary
       8              : // Licenses when the conditions for such availability set forth in the Eclipse
       9              : // Public License 2.0 are satisfied: GNU General Public License, version 2
      10              : // or later which is available at
      11              : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
      12              : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
      13              : /****************************************************************************/
      14              : /// @file    ParquetFormatter.cpp
      15              : /// @author  Michael Behrisch
      16              : /// @date    2025-06-17
      17              : ///
      18              : // An output formatter for Parquet files
      19              : /****************************************************************************/
      20              : #include <config.h>
      21              : 
      22              : #ifdef _MSC_VER
      23              : #pragma warning(push)
      24              : /* Disable warning about unused parameters */
      25              : #pragma warning(disable: 4100)
      26              : /* Disable warning about hidden function arrow::io::Writable::Write */
      27              : #pragma warning(disable: 4266)
      28              : /* Disable warning about padded memory layout */
      29              : #pragma warning(disable: 4324)
      30              : /* Disable warning about this in initializers */
      31              : #pragma warning(disable: 4355)
      32              : /* Disable warning about changed memory layout due to virtual base class */
      33              : #pragma warning(disable: 4435)
      34              : /* Disable warning about declaration hiding class member */
      35              : #pragma warning(disable: 4458)
      36              : /* Disable warning about implicit conversion of int to bool */
      37              : #pragma warning(disable: 4800)
      38              : #endif
      39              : #include <arrow/api.h>
      40              : #include <arrow/io/api.h>
      41              : #include <parquet/arrow/writer.h>
      42              : #ifdef _MSC_VER
      43              : #pragma warning(pop)
      44              : #endif
      45              : 
      46              : #include <utils/common/MsgHandler.h>
      47              : #include <utils/common/ToString.h>
      48              : #include "ParquetFormatter.h"
      49              : 
      50              : 
      51              : // ===========================================================================
      52              : // helper class definitions
      53              : // ===========================================================================
      54              : class ArrowOStreamWrapper : public arrow::io::OutputStream {
      55              : public:
      56              :     ArrowOStreamWrapper(std::ostream& out)
      57           46 :         : myOStream(out), myAmOpen(true) {}
      58              : 
      59            0 :     arrow::Status Close() override {
      60            0 :         myAmOpen = false;
      61            0 :         return arrow::Status::OK();
      62              :     }
      63              : 
      64            0 :     arrow::Status Flush() override {
      65            0 :         myOStream.flush();
      66            0 :         return arrow::Status::OK();
      67              :     }
      68              : 
      69         2863 :     arrow::Result<int64_t> Tell() const override {
      70         2863 :         return myOStream.tellp();
      71              :     }
      72              : 
      73            0 :     bool closed() const override {
      74            0 :         return !myAmOpen;
      75              :     }
      76              : 
      77         1819 :     arrow::Status Write(const void* data, int64_t nbytes) override {
      78         1819 :         if (!myAmOpen) {
      79              :             return arrow::Status::IOError("Write on closed stream");
      80              :         }
      81         1819 :         myOStream.write(reinterpret_cast<const char*>(data), nbytes);
      82         1819 :         if (!myOStream) {
      83              :             return arrow::Status::IOError("Failed to write to ostream");
      84              :         }
      85              :         return arrow::Status::OK();
      86              :     }
      87              : 
      88              : private:
      89              :     std::ostream& myOStream;
      90              :     bool myAmOpen;
      91              : };
      92              : 
      93              : 
      94              : // ===========================================================================
      95              : // ParquetFormatter::Impl definition
      96              : // ===========================================================================
      97              : struct ParquetFormatter::Impl {
      98           46 :     Impl(const std::string& columnNames, const int batchSize)
      99          138 :         : myHeaderFormat(columnNames), myBatchSize(batchSize) {}
     100              : 
     101              :     /// @brief the format to use for the column names
     102              :     const std::string myHeaderFormat;
     103              : 
     104              :     /// @brief the compression to use
     105              :     parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
     106              : 
     107              :     /// @brief the number of rows to write per batch
     108              :     const int myBatchSize;
     109              : 
     110              :     /// @brief the currently read tag (only valid when generating the header)
     111              :     std::string myCurrentTag;
     112              : 
     113              :     /// @brief the table schema
     114              :     std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
     115              : 
     116              :     /// @brief the output stream writer
     117              :     std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
     118              : 
     119              :     /// @brief the content array builders for the table
     120              :     std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
     121              : 
     122              :     /// @brief The number of attributes in the currently open XML elements
     123              :     std::vector<int> myXMLStack;
     124              : 
     125              :     /// @brief the current attribute / column values
     126              :     std::vector<std::shared_ptr<arrow::Scalar> > myValues;
     127              : 
     128              :     /// @brief the maximum depth of the XML hierarchy
     129              :     int myMaxDepth = 2;
     130              : 
     131              :     /// @brief whether the schema has been constructed completely
     132              :     bool myWroteHeader = false;
     133              : 
     134              :     /// @brief whether the columns should be checked for completeness
     135              :     bool myCheckColumns = false;
     136              : 
     137              :     /// @brief whether there is still unwritten data
     138              :     bool myNeedsWrite = false;
     139              : 
     140              :     /// @brief the attributes which are expected for a complete row (including null values)
     141              :     SumoXMLAttrMask myExpectedAttrs;
     142              : 
     143              :     /// @brief the attributes already seen (including null values)
     144              :     SumoXMLAttrMask mySeenAttrs;
     145              : 
     146              :     /// @brief column-name lookup honoring the headerFormat option
     147        54246 :     std::string getAttrString(const std::string& attrString) const {
     148        54246 :         if (myHeaderFormat == "plain") {
     149              :             return attrString;
     150              :         }
     151        54246 :         if (myHeaderFormat == "auto") {
     152            0 :             for (const auto& field : mySchema->fields()) {
     153            0 :                 if (field->name() == attrString) {
     154            0 :                     return myCurrentTag + "_" + attrString;
     155              :                 }
     156              :             }
     157              :             return attrString;
     158              :         }
     159       108492 :         return myCurrentTag + "_" + attrString;
     160              :     }
     161              : 
     162        17904 :     void checkAttr(const SumoXMLAttr attr) {
     163        17904 :         if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
     164         8278 :             mySeenAttrs.set(attr);
     165         8278 :             if (!myExpectedAttrs.test(attr)) {
     166            0 :                 throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
     167              :             }
     168              :         }
     169        17904 :     }
     170              : 
     171              :     template <class ATTR_TYPE, class BUILDER>
     172        72236 :     void checkBuilder(const ATTR_TYPE& attr, const std::shared_ptr<arrow::DataType>& (*dataType)()) {
     173        72236 :         myNeedsWrite = true;
     174        72236 :         if (!myWroteHeader) {
     175       108492 :             const std::string fieldName = getAttrString(toString(attr));
     176       386531 :             for (const auto& field : mySchema->fields()) {
     177       385986 :                 if (field->name() == fieldName) {
     178              :                     return;
     179              :                 }
     180              :             }
     181         2180 :             mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(fieldName, dataType()));
     182              :             auto builder = std::make_shared<BUILDER>();
     183          545 :             if (!myBuilders.empty()) {
     184          501 :                 if (myBuilders.back()->length() > 0) {
     185          140 :                     PARQUET_THROW_NOT_OK(builder->AppendNulls(myBuilders.back()->length()));
     186              :                 }
     187          589 :                 while (myValues.size() < myBuilders.size()) {
     188          176 :                     myValues.push_back(nullptr);
     189              :                 }
     190              :             }
     191         1090 :             myBuilders.push_back(builder);
     192              :         }
     193              :     }
     194              : };
     195              : 
     196              : 
     197              : // ===========================================================================
     198              : // member method definitions
     199              : // ===========================================================================
     200           46 : ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
     201           46 :     : OutputFormatter(OutputFormatterType::PARQUET), myImpl(std::make_unique<Impl>(columnNames, batchSize)) {
     202           46 :     if (compression == "snappy") {
     203            0 :         myImpl->myCompression = parquet::Compression::SNAPPY;
     204           46 :     } else if (compression == "gzip") {
     205            0 :         myImpl->myCompression = parquet::Compression::GZIP;
     206           46 :     } else if (compression == "brotli") {
     207            0 :         myImpl->myCompression = parquet::Compression::BROTLI;
     208           46 :     } else if (compression == "zstd") {
     209            0 :         myImpl->myCompression = parquet::Compression::ZSTD;
     210           46 :     } else if (compression == "lz4") {
     211            0 :         myImpl->myCompression = parquet::Compression::LZ4;
     212           46 :     } else if (compression == "bz2") {
     213            0 :         myImpl->myCompression = parquet::Compression::BZ2;
     214           46 :     } else if (compression != "" && compression != "uncompressed") {
     215            0 :         WRITE_ERRORF("Unknown compression: %", compression);
     216              :     }
     217           46 :     if (!arrow::util::Codec::IsAvailable(myImpl->myCompression)) {
     218            0 :         WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
     219            0 :         myImpl->myCompression = parquet::Compression::UNCOMPRESSED;
     220              :     }
     221           46 : }
     222              : 
     223              : 
     224           92 : ParquetFormatter::~ParquetFormatter() = default;
     225              : 
     226              : 
     227              : void
     228         6603 : ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
     229         6603 :     myImpl->myXMLStack.push_back((int)myImpl->myValues.size());
     230         6603 :     if (!myImpl->myWroteHeader) {
     231         5519 :         myImpl->myCurrentTag = xmlElement;
     232              :     }
     233         6603 :     if (myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() && myImpl->myWroteHeader && myImpl->myCurrentTag != xmlElement) {
     234          699 :         WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myImpl->myCurrentTag, xmlElement);
     235              :     }
     236         6603 : }
     237              : 
     238              : 
     239              : void
     240         1660 : ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
     241         1660 :     myImpl->myXMLStack.push_back((int)myImpl->myValues.size());
     242         1660 :     if (!myImpl->myWroteHeader) {
     243          552 :         myImpl->myCurrentTag = toString(xmlElement);
     244              :     }
     245         2824 :     if (myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() && myImpl->myWroteHeader && myImpl->myCurrentTag != toString(xmlElement)) {
     246           69 :         WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myImpl->myCurrentTag, toString(xmlElement));
     247              :     }
     248         1660 : }
     249              : 
     250              : 
     251              : bool
     252         8309 : ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
     253         8309 :     if (myImpl->myMaxDepth == 0) {
     254              :         // the auto detection case: the first closed tag determines the depth
     255            0 :         myImpl->myMaxDepth = (int)myImpl->myXMLStack.size();
     256              :     }
     257         8309 :     if ((myImpl->myMaxDepth == (int)myImpl->myXMLStack.size() || myImpl->myXMLStack.empty()) && !myImpl->myWroteHeader) {
     258              :         // we are at the correct depth or the document has ended (XML stack is empty)
     259              :         // so we should initialize the writer with the schema (if not done yet)
     260           46 :         if (!myImpl->myCheckColumns) {
     261           54 :             WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
     262              :         }
     263              :         auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
     264           46 :         std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myImpl->myCompression)->build();
     265          184 :         myImpl->myParquetWriter = *parquet::arrow::FileWriter::Open(*myImpl->mySchema, arrow::default_memory_pool(), arrow_stream, props);
     266           46 :         myImpl->myWroteHeader = true;
     267              :     }
     268              :     bool writeBatch = false;
     269         8309 :     if (myImpl->myNeedsWrite) {
     270         8179 :         if (myImpl->myCheckColumns && (int)myImpl->myXMLStack.size() == myImpl->myMaxDepth && myImpl->myExpectedAttrs != myImpl->mySeenAttrs) {
     271         1552 :             for (int i = 0; i < (int)myImpl->myExpectedAttrs.size(); ++i) {
     272         1536 :                 if (myImpl->myExpectedAttrs.test(i) && !myImpl->mySeenAttrs.test(i)) {
     273           36 :                     WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
     274              :                                  toString((SumoXMLAttr)i));
     275              :                 }
     276              :             }
     277              :         }
     278              :         int index = 0;
     279        83566 :         for (auto& builder : myImpl->myBuilders) {
     280        76233 :             const auto val = index < (int)myImpl->myValues.size() ? myImpl->myValues[index++] : nullptr;
     281       148661 :             PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
     282              :         }
     283         7333 :         writeBatch = myImpl->myWroteHeader && myImpl->myBuilders.back()->length() >= myImpl->myBatchSize;
     284              :         myImpl->mySeenAttrs.reset();
     285         7333 :         myImpl->myNeedsWrite = false;
     286              :     }
     287         8309 :     if (writeBatch || (myImpl->myXMLStack.empty() && !myImpl->myBuilders.empty())) {
     288              :         std::vector<std::shared_ptr<arrow::Array> > data;
     289          589 :         for (auto& builder : myImpl->myBuilders) {
     290          545 :             std::shared_ptr<arrow::Array> column;
     291          545 :             PARQUET_THROW_NOT_OK(builder->Finish(&column));
     292          545 :             data.push_back(column);
     293              :             // builder.reset();
     294              :         }
     295          132 :         auto batch = arrow::RecordBatch::Make(myImpl->mySchema, data.back()->length(), data);
     296           44 :         PARQUET_THROW_NOT_OK(myImpl->myParquetWriter->WriteRecordBatch(*batch));
     297           44 :     }
     298         8309 :     if (!myImpl->myXMLStack.empty()) {
     299         8263 :         if ((int)myImpl->myValues.size() > myImpl->myXMLStack.back()) {
     300         8063 :             myImpl->myValues.resize(myImpl->myXMLStack.back());
     301              :         }
     302              :         myImpl->myXMLStack.pop_back();
     303              :     }
     304         8309 :     return false;
     305              : }
     306              : 
     307              : 
     308              : void
     309         9353 : ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
     310         9353 :     myImpl->checkAttr(attr);
     311         9353 :     if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
     312         2124 :         myImpl->checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
     313         4248 :         myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
     314              :     } else {
     315         7229 :         myImpl->checkBuilder<SumoXMLAttr, arrow::FloatBuilder>(attr, arrow::float32);
     316        14458 :         myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
     317              :     }
     318         9353 : }
     319              : 
     320              : 
     321              : void
     322          713 : ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
     323          713 :     myImpl->checkAttr(attr);
     324          713 :     myImpl->checkBuilder<SumoXMLAttr, arrow::Int32Builder>(attr, arrow::int32);
     325          713 :     myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
     326          713 : }
     327              : 
     328              : 
     329              : void
     330         7105 : ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val, const bool isNull) {
     331              :     assert(!myImpl->myCheckColumns);
     332         7105 :     if (into.precision() > 2) {
     333            0 :         myImpl->checkBuilder<std::string, arrow::DoubleBuilder>(attr, arrow::float64);
     334            0 :         myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
     335              :     } else {
     336         7105 :         myImpl->checkBuilder<std::string, arrow::FloatBuilder>(attr, arrow::float32);
     337        14210 :         myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
     338              :     }
     339         7105 : }
     340              : 
     341              : 
     342              : void
     343        23108 : ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val, const bool isNull) {
     344              :     assert(!myImpl->myCheckColumns);
     345        23108 :     myImpl->checkBuilder<std::string, arrow::Int32Builder>(attr, arrow::int32);
     346        23108 :     myImpl->myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
     347        23108 : }
     348              : 
     349              : 
     350              : void
     351         6890 : ParquetFormatter::writeStringAttr(const SumoXMLAttr attr, const std::string& val) {
     352         6890 :     myImpl->checkAttr(attr);
     353         6890 :     myImpl->checkBuilder<SumoXMLAttr, arrow::StringBuilder>(attr, arrow::utf8);
     354         6890 :     myImpl->myValues.push_back(std::make_shared<arrow::StringScalar>(val));
     355         6890 : }
     356              : 
     357              : 
     358              : void
     359        22531 : ParquetFormatter::writeStringAttr(const std::string& attr, const std::string& val) {
     360              :     assert(!myImpl->myCheckColumns);
     361        22531 :     myImpl->checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
     362        22531 :     myImpl->myValues.push_back(std::make_shared<arrow::StringScalar>(val));
     363        22531 : }
     364              : 
     365              : 
     366              : void
     367          948 : ParquetFormatter::writeNullAttr(const SumoXMLAttr attr) {
     368          948 :     myImpl->checkAttr(attr);
     369          948 :     myImpl->checkBuilder<SumoXMLAttr, arrow::StringBuilder>(attr, arrow::utf8);
     370          948 :     myImpl->myValues.push_back(nullptr);
     371          948 : }
     372              : 
     373              : 
     374              : void
     375          432 : ParquetFormatter::writeNullAttr(const std::string& attr) {
     376              :     assert(!myImpl->myCheckColumns);
     377          432 :     myImpl->checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
     378          432 :     myImpl->myValues.push_back(nullptr);
     379          432 : }
     380              : 
     381              : 
     382              : void
     383         1156 : ParquetFormatter::writeTime(std::ostream& /* into */, const SumoXMLAttr attr, const SUMOTime val) {
     384         1156 :     if (!gHumanReadableTime) {
     385              :         // always float64 for machine-readable time, regardless of stream precision
     386         1156 :         myImpl->checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
     387         1156 :         myImpl->myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
     388         1156 :         return;
     389              :     }
     390            0 :     writeStringAttr(attr, time2string(val));
     391              : }
     392              : 
     393              : 
     394              : bool
     395            0 : ParquetFormatter::wroteHeader() const {
     396            0 :     return myImpl->myWroteHeader;
     397              : }
     398              : 
     399              : 
     400              : void
     401           58 : ParquetFormatter::setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth) {
     402           58 :     myImpl->myExpectedAttrs = expected;
     403           58 :     myImpl->myMaxDepth = depth;
     404           58 :     myImpl->myCheckColumns = expected.any();
     405           58 : }
     406              : 
     407              : 
     408              : /****************************************************************************/
        

Generated by: LCOV version 2.0-1