LCOV - code coverage report
Current view: top level - src/utils/iodevices - ParquetFormatter.cpp (source / functions) Coverage Total Hit
Test: lcov.info Lines: 77.8 % 81 63
Test Date: 2025-12-06 15:35:27 Functions: 66.7 % 9 6

            Line data    Source code
       1              : /****************************************************************************/
       2              : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
       3              : // Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
       4              : // This program and the accompanying materials are made available under the
       5              : // terms of the Eclipse Public License 2.0 which is available at
       6              : // https://www.eclipse.org/legal/epl-2.0/
       7              : // This Source Code may also be made available under the following Secondary
       8              : // Licenses when the conditions for such availability set forth in the Eclipse
       9              : // Public License 2.0 are satisfied: GNU General Public License, version 2
      10              : // or later which is available at
      11              : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
      12              : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
      13              : /****************************************************************************/
      14              : /// @file    ParquetFormatter.cpp
      15              : /// @author  Michael Behrisch
      16              : /// @date    2025-06-17
      17              : ///
      18              : // An output formatter for Parquet files
      19              : /****************************************************************************/
      20              : #include <config.h>
      21              : 
      22              : #ifdef _MSC_VER
      23              : #pragma warning(push)
      24              : /* Disable warning about changed memory layout due to virtual base class */
      25              : #pragma warning(disable: 4435)
      26              : #endif
      27              : #include <arrow/io/api.h>
      28              : #ifdef _MSC_VER
      29              : #pragma warning(pop)
      30              : #endif
      31              : 
      32              : #include <utils/common/MsgHandler.h>
      33              : #include <utils/common/ToString.h>
      34              : #include "ParquetFormatter.h"
      35              : 
      36              : 
      37              : // ===========================================================================
      38              : // helper class definitions
      39              : // ===========================================================================
      40              : class ArrowOStreamWrapper : public arrow::io::OutputStream {
      41              : public:
      42              :     ArrowOStreamWrapper(std::ostream& out)
      43           24 :         : myOStream(out), myAmOpen(true) {}
      44              : 
      45            0 :     arrow::Status Close() override {
      46            0 :         myAmOpen = false;
      47            0 :         return arrow::Status::OK();
      48              :     }
      49              : 
      50            0 :     arrow::Status Flush() override {
      51            0 :         myOStream.flush();
      52            0 :         return arrow::Status::OK();
      53              :     }
      54              : 
      55         1177 :     arrow::Result<int64_t> Tell() const override {
      56         1177 :         return myOStream.tellp();
      57              :     }
      58              : 
      59            0 :     bool closed() const override {
      60            0 :         return !myAmOpen;
      61              :     }
      62              : 
      63          759 :     arrow::Status Write(const void* data, int64_t nbytes) override {
      64          759 :         if (!myAmOpen) {
      65              :             return arrow::Status::IOError("Write on closed stream");
      66              :         }
      67          759 :         myOStream.write(reinterpret_cast<const char*>(data), nbytes);
      68          759 :         if (!myOStream) {
      69              :             return arrow::Status::IOError("Failed to write to ostream");
      70              :         }
      71              :         return arrow::Status::OK();
      72              :     }
      73              : 
      74              : private:
      75              :     std::ostream& myOStream;
      76              :     bool myAmOpen;
      77              : };
      78              : 
      79              : 
      80              : // ===========================================================================
      81              : // member method definitions
      82              : // ===========================================================================
      83           24 : ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
      84           48 :     : OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {
      85           24 :     if (compression == "snappy") {
      86            0 :         myCompression = parquet::Compression::SNAPPY;
      87           24 :     } else if (compression == "gzip") {
      88            0 :         myCompression = parquet::Compression::GZIP;
      89           24 :     } else if (compression == "brotli") {
      90            0 :         myCompression = parquet::Compression::BROTLI;
      91           24 :     } else if (compression == "zstd") {
      92            0 :         myCompression = parquet::Compression::ZSTD;
      93           24 :     } else if (compression == "lz4") {
      94            0 :         myCompression = parquet::Compression::LZ4;
      95           24 :     } else if (compression == "bz2") {
      96            0 :         myCompression = parquet::Compression::BZ2;
      97           24 :     } else if (compression != "" && compression != "uncompressed") {
      98            0 :         WRITE_ERRORF("Unknown compression: %", compression);
      99              :     }
     100           24 :     if (!arrow::util::Codec::IsAvailable(myCompression)) {
     101            0 :         WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
     102            0 :         myCompression = parquet::Compression::UNCOMPRESSED;
     103              :     }
     104           24 : }
     105              : 
     106              : void
     107          756 : ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
     108          756 :     myXMLStack.push_back((int)myValues.size());
     109          756 :     if (!myWroteHeader) {
     110          624 :         myCurrentTag = xmlElement;
     111              :     }
     112          756 :     if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {
     113            0 :         WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);
     114              :     }
     115          756 : }
     116              : 
     117              : 
     118              : void
     119          744 : ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
     120          744 :     myXMLStack.push_back((int)myValues.size());
     121          744 :     if (!myWroteHeader) {
     122           48 :         myCurrentTag = toString(xmlElement);
     123              :     }
     124         1464 :     if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {
     125           36 :         WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));
     126              :     }
     127          744 : }
     128              : 
     129              : 
     130              : bool
     131         1524 : ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
     132         1524 :     if (myMaxDepth == 0) {
     133            6 :         myMaxDepth = (int)myXMLStack.size();
     134              :     }
     135         1524 :     if (myMaxDepth == (int)myXMLStack.size() && !myWroteHeader) {
     136           24 :         if (!myCheckColumns) {
     137           12 :             WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
     138              :         }
     139              :         auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
     140           24 :         std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();
     141           96 :         myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);
     142           24 :         myWroteHeader = true;
     143              :     }
     144              :     bool writeBatch = false;
     145         1524 :     if ((int)myXMLStack.size() == myMaxDepth) {
     146         1476 :         if (myCheckColumns && myExpectedAttrs != mySeenAttrs) {
     147         1552 :             for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {
     148         1536 :                 if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {
     149           36 :                     WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
     150              :                                  toString((SumoXMLAttr)i));
     151           36 :                     myValues.push_back(nullptr);
     152              :                 }
     153              :             }
     154              :         }
     155              :         int index = 0;
     156         8756 :         for (auto& builder : myBuilders) {
     157         8012 :             const auto val = myValues[index++];
     158        15294 :             PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
     159              :         }
     160          744 :         writeBatch = myBuilders.back()->length() == myBatchSize;
     161              :         mySeenAttrs.reset();
     162              :     }
     163         1524 :     if (writeBatch || myXMLStack.empty()) {
     164              :         std::vector<std::shared_ptr<arrow::Array> > data;
     165          245 :         for (auto& builder : myBuilders) {
     166          221 :             std::shared_ptr<arrow::Array> column;
     167          221 :             PARQUET_THROW_NOT_OK(builder->Finish(&column));
     168          221 :             data.push_back(column);
     169              :             // builder.reset();
     170              :         }
     171           72 :         auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);
     172           24 :         PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));
     173           24 :     }
     174         1524 :     if (!myXMLStack.empty()) {
     175         9532 :         while ((int)myValues.size() > myXMLStack.back()) {
     176         8032 :             if (!myWroteHeader) {
     177         1200 :                 mySchema = *mySchema->RemoveField(mySchema->num_fields() - 1);
     178              :                 myBuilders.pop_back();
     179              :             }
     180              :             myValues.pop_back();
     181              :         }
     182              :         myXMLStack.pop_back();
     183              :     }
     184         1524 :     return false;
     185              : }
     186              : 
     187              : 
     188              : /****************************************************************************/
        

Generated by: LCOV version 2.0-1