LCOV - code coverage report
Current view: top level - src/utils/iodevices - ParquetFormatter.cpp (source / functions) Coverage Total Hit
Test: lcov.info Lines: 77.5 % 80 62
Test Date: 2026-03-02 16:00:03 Functions: 66.7 % 9 6

            Line data    Source code
       1              : /****************************************************************************/
       2              : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
       3              : // Copyright (C) 2012-2026 German Aerospace Center (DLR) and others.
       4              : // This program and the accompanying materials are made available under the
       5              : // terms of the Eclipse Public License 2.0 which is available at
       6              : // https://www.eclipse.org/legal/epl-2.0/
       7              : // This Source Code may also be made available under the following Secondary
       8              : // Licenses when the conditions for such availability set forth in the Eclipse
       9              : // Public License 2.0 are satisfied: GNU General Public License, version 2
      10              : // or later which is available at
      11              : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
      12              : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
      13              : /****************************************************************************/
      14              : /// @file    ParquetFormatter.cpp
      15              : /// @author  Michael Behrisch
      16              : /// @date    2025-06-17
      17              : ///
      18              : // An output formatter for Parquet files
      19              : /****************************************************************************/
      20              : #include <config.h>
      21              : 
      22              : #ifdef _MSC_VER
      23              : #pragma warning(push)
      24              : /* Disable warning about changed memory layout due to virtual base class */
      25              : #pragma warning(disable: 4435)
      26              : #endif
      27              : #include <arrow/io/api.h>
      28              : #ifdef _MSC_VER
      29              : #pragma warning(pop)
      30              : #endif
      31              : 
      32              : #include <utils/common/MsgHandler.h>
      33              : #include <utils/common/ToString.h>
      34              : #include "ParquetFormatter.h"
      35              : 
      36              : 
      37              : // ===========================================================================
      38              : // helper class definitions
      39              : // ===========================================================================
      40              : class ArrowOStreamWrapper : public arrow::io::OutputStream {
      41              : public:
      42              :     ArrowOStreamWrapper(std::ostream& out)
      43           24 :         : myOStream(out), myAmOpen(true) {}
      44              : 
      45            0 :     arrow::Status Close() override {
      46            0 :         myAmOpen = false;
      47            0 :         return arrow::Status::OK();
      48              :     }
      49              : 
      50            0 :     arrow::Status Flush() override {
      51            0 :         myOStream.flush();
      52            0 :         return arrow::Status::OK();
      53              :     }
      54              : 
      55         1177 :     arrow::Result<int64_t> Tell() const override {
      56         1177 :         return myOStream.tellp();
      57              :     }
      58              : 
      59            0 :     bool closed() const override {
      60            0 :         return !myAmOpen;
      61              :     }
      62              : 
      63          759 :     arrow::Status Write(const void* data, int64_t nbytes) override {
      64          759 :         if (!myAmOpen) {
      65              :             return arrow::Status::IOError("Write on closed stream");
      66              :         }
      67          759 :         myOStream.write(reinterpret_cast<const char*>(data), nbytes);
      68          759 :         if (!myOStream) {
      69              :             return arrow::Status::IOError("Failed to write to ostream");
      70              :         }
      71              :         return arrow::Status::OK();
      72              :     }
      73              : 
      74              : private:
      75              :     std::ostream& myOStream;
      76              :     bool myAmOpen;
      77              : };
      78              : 
      79              : 
      80              : // ===========================================================================
      81              : // member method definitions
      82              : // ===========================================================================
      83           24 : ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
      84           48 :     : OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {
      85           24 :     if (compression == "snappy") {
      86            0 :         myCompression = parquet::Compression::SNAPPY;
      87           24 :     } else if (compression == "gzip") {
      88            0 :         myCompression = parquet::Compression::GZIP;
      89           24 :     } else if (compression == "brotli") {
      90            0 :         myCompression = parquet::Compression::BROTLI;
      91           24 :     } else if (compression == "zstd") {
      92            0 :         myCompression = parquet::Compression::ZSTD;
      93           24 :     } else if (compression == "lz4") {
      94            0 :         myCompression = parquet::Compression::LZ4;
      95           24 :     } else if (compression == "bz2") {
      96            0 :         myCompression = parquet::Compression::BZ2;
      97           24 :     } else if (compression != "" && compression != "uncompressed") {
      98            0 :         WRITE_ERRORF("Unknown compression: %", compression);
      99              :     }
     100           24 :     if (!arrow::util::Codec::IsAvailable(myCompression)) {
     101            0 :         WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
     102            0 :         myCompression = parquet::Compression::UNCOMPRESSED;
     103              :     }
     104           24 : }
     105              : 
     106              : void
     107          756 : ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
     108          756 :     myXMLStack.push_back((int)myValues.size());
     109          756 :     if (!myWroteHeader) {
     110          624 :         myCurrentTag = xmlElement;
     111              :     }
     112          756 :     if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {
     113            0 :         WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);
     114              :     }
     115          756 : }
     116              : 
     117              : 
     118              : void
     119          744 : ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
     120          744 :     myXMLStack.push_back((int)myValues.size());
     121          744 :     if (!myWroteHeader) {
     122           48 :         myCurrentTag = toString(xmlElement);
     123              :     }
     124         1464 :     if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {
     125           36 :         WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));
     126              :     }
     127          744 : }
     128              : 
     129              : 
     130              : bool
     131         1524 : ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
     132         1524 :     if (myMaxDepth == 0) {
     133              :         // the auto detection case: the first closed tag determines the depth
     134            6 :         myMaxDepth = (int)myXMLStack.size();
     135              :     }
     136         1524 :     if ((myMaxDepth == (int)myXMLStack.size() || myXMLStack.empty()) && !myWroteHeader) {
     137              :         // we are at the correct depth or the document has ended (XML stack is empty)
     138              :         // so we should initialize the writer with the schema (if not done yet)
     139           24 :         if (!myCheckColumns) {
     140           12 :             WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
     141              :         }
     142              :         auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
     143           24 :         std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();
     144           96 :         myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);
     145           24 :         myWroteHeader = true;
     146              :     }
     147              :     bool writeBatch = false;
     148         1524 :     if (myNeedsWrite) {
     149         2076 :         if (myCheckColumns && (int)myXMLStack.size() == myMaxDepth && myExpectedAttrs != mySeenAttrs) {
     150         1552 :             for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {
     151         1536 :                 if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {
     152           36 :                     WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
     153              :                                  toString((SumoXMLAttr)i));
     154              :                 }
     155              :             }
     156              :         }
     157              :         int index = 0;
     158         9956 :         for (auto& builder : myBuilders) {
     159         8612 :             const auto val = index < (int)myValues.size() ? myValues[index++] : nullptr;
     160        16494 :             PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
     161              :         }
     162         1344 :         writeBatch = myWroteHeader && myBuilders.back()->length() >= myBatchSize;
     163              :         mySeenAttrs.reset();
     164         1344 :         myNeedsWrite = false;
     165              :     }
     166         1524 :     if (writeBatch || (myXMLStack.empty() && !myBuilders.empty())) {
     167              :         std::vector<std::shared_ptr<arrow::Array> > data;
     168          245 :         for (auto& builder : myBuilders) {
     169          221 :             std::shared_ptr<arrow::Array> column;
     170          221 :             PARQUET_THROW_NOT_OK(builder->Finish(&column));
     171          221 :             data.push_back(column);
     172              :             // builder.reset();
     173              :         }
     174           72 :         auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);
     175           24 :         PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));
     176           24 :     }
     177         1524 :     if (!myXMLStack.empty()) {
     178         1500 :         if ((int)myValues.size() > myXMLStack.back()) {
     179         1500 :             myValues.resize(myXMLStack.back());
     180              :         }
     181              :         myXMLStack.pop_back();
     182              :     }
     183         1524 :     return false;
     184              : }
     185              : 
     186              : 
     187              : /****************************************************************************/
        

Generated by: LCOV version 2.0-1