LCOV - code coverage report
Current view: top level - src/utils/iodevices - ParquetFormatter.h (source / functions) Coverage Total Hit
Test: lcov.info Lines: 61.6 % 73 45
Test Date: 2026-03-02 16:00:03 Functions: 13.0 % 92 12

            Line data    Source code
       1              : /****************************************************************************/
       2              : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
       3              : // Copyright (C) 2012-2026 German Aerospace Center (DLR) and others.
       4              : // This program and the accompanying materials are made available under the
       5              : // terms of the Eclipse Public License 2.0 which is available at
       6              : // https://www.eclipse.org/legal/epl-2.0/
       7              : // This Source Code may also be made available under the following Secondary
       8              : // Licenses when the conditions for such availability set forth in the Eclipse
       9              : // Public License 2.0 are satisfied: GNU General Public License, version 2
      10              : // or later which is available at
      11              : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
      12              : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
      13              : /****************************************************************************/
      14              : /// @file    ParquetFormatter.h
      15              : /// @author  Michael Behrisch
      16              : /// @date    2025-06-17
      17              : ///
      18              : // Output formatter for Parquet output
      19              : /****************************************************************************/
      20              : #pragma once
      21              : #include <config.h>
      22              : 
      23              : #include <ostream>
      24              : 
      25              : #ifdef _MSC_VER
      26              : /* Disable warning about unmatched push / pop.
      27              :    TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
      28              : #pragma warning(suppress: 5032)
      29              : #pragma warning(push)
      30              : /* Disable warning about unused parameters */
      31              : #pragma warning(disable: 4100)
      32              : /* Disable warning about hidden function arrow::io::Writable::Write */
      33              : #pragma warning(disable: 4266)
      34              : /* Disable warning about padded memory layout */
      35              : #pragma warning(disable: 4324)
      36              : /* Disable warning about this in initializers */
      37              : #pragma warning(disable: 4355)
      38              : /* Disable warning about changed memory layout due to virtual base class */
      39              : #pragma warning(disable: 4435)
      40              : /* Disable warning about declaration hiding class member */
      41              : #pragma warning(disable: 4458)
      42              : /* Disable warning about implicit conversion of int to bool */
      43              : #pragma warning(disable: 4800)
      44              : #endif
      45              : #include <arrow/api.h>
      46              : #include <parquet/arrow/writer.h>
      47              : #ifdef _MSC_VER
      48              : /* Disable warning about unmatched push / pop.
      49              :    TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
      50              : #pragma warning(suppress: 5031)
      51              : #pragma warning(pop)
      52              : #endif
      53              : 
      54              : #include <utils/common/ToString.h>
      55              : #include "OutputFormatter.h"
      56              : 
      57              : 
      58              : // ===========================================================================
      59              : // class definitions
      60              : // ===========================================================================
      61              : /**
      62              :  * @class ParquetFormatter
      63              :  * @brief Output formatter for Parquet output
      64              :  */
      65              : class ParquetFormatter : public OutputFormatter {
      66              : public:
      67              :     /// @brief Constructor
      68              :     // for some motivation on the default batch size see https://stackoverflow.com/questions/76782018/what-is-actually-meant-when-referring-to-parquet-row-group-size
      69              :     ParquetFormatter(const std::string& columnNames, const std::string& compression = "", const int batchSize = 1000000);
      70              : 
      71              :     /// @brief Destructor
      72           72 :     virtual ~ParquetFormatter() { }
      73              : 
      74              :     /** @brief Keeps track of an open XML tag by adding a new element to the stack
      75              :      *
      76              :      * @param[in] into The output stream to use (unused)
      77              :      * @param[in] xmlElement Name of element to open (unused)
      78              :      * @return The OutputDevice for further processing
      79              :      */
      80              :     void openTag(std::ostream& into, const std::string& xmlElement);
      81              : 
      82              :     /** @brief Keeps track of an open XML tag by adding a new element to the stack
      83              :      *
      84              :      * @param[in] into The output stream to use (unused)
      85              :      * @param[in] xmlElement Name of element to open (unused)
      86              :      */
      87              :     void openTag(std::ostream& into, const SumoXMLTag& xmlElement);
      88              : 
      89              :     /** @brief Closes the most recently opened tag
      90              :      *
      91              :      * @param[in] into The output stream to use
      92              :      * @return Whether a further element existed in the stack and could be closed
      93              :      * @todo it is not verified that the topmost element was closed
      94              :      */
      95              :     bool closeTag(std::ostream& into, const std::string& comment = "");
      96              : 
      97              :     /** @brief writes a named attribute
      98              :      *
      99              :      * @param[in] attr The attribute (name)
     100              :      * @param[in] val The attribute value
     101              :      * @param[in] isNull The given value is not set
     102              :      */
     103              :     template <class T>
     104         2904 :     void writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const T& val, const bool isNull = false) {
     105         2904 :         checkAttr(attr);
     106         2904 :         checkBuilder<SumoXMLAttr, arrow::StringBuilder>(attr, arrow::utf8);
     107         5808 :         myValues.push_back(isNull ? nullptr : std::make_shared<arrow::StringScalar>(toString(val)));
     108         2904 :     }
     109              : 
     110              :     template <class T>
     111            0 :     void writeAttr(std::ostream& /* into */, const std::string& attr, const T& val) {
     112              :         assert(!myCheckColumns);
     113            0 :         checkBuilder<std::string, arrow::StringBuilder>(attr, arrow::utf8);
     114            0 :         myValues.push_back(std::make_shared<arrow::StringScalar>(toString(val)));
     115            0 :     }
     116              : 
     117          756 :     void writeTime(std::ostream& into, const SumoXMLAttr attr, const SUMOTime val) {
     118          756 :         if (!gHumanReadableTime) {
     119          756 :             checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
     120          756 :             myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
     121          756 :             return;
     122              :         }
     123            0 :         writeAttr(into, attr, time2string(val));
     124              :     }
     125              : 
     126            0 :     bool wroteHeader() const {
     127            0 :         return myWroteHeader;
     128              :     }
     129              : 
     130           18 :     void setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth = 2) {
     131           18 :         myExpectedAttrs = expected;
     132           18 :         myMaxDepth = depth;
     133           18 :         myCheckColumns = expected.any();
     134           18 :     }
     135              : 
     136              : private:
     137          821 :     inline const std::string getAttrString(const std::string& attrString) {
     138          821 :         if (myHeaderFormat == "plain") {
     139              :             return attrString;
     140              :         }
     141          821 :         if (myHeaderFormat == "auto") {
     142            0 :             for (const auto& field : mySchema->fields()) {
     143            0 :                 if (field->name() == attrString) {
     144            0 :                     return myCurrentTag + "_" + attrString;
     145              :                 }
     146              :             }
     147              :             return attrString;
     148              :         }
     149         1642 :         return myCurrentTag + "_" + attrString;
     150              :     }
     151              : 
     152         7258 :     inline void checkAttr(const SumoXMLAttr attr) {
     153         7258 :         if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
     154         7138 :             mySeenAttrs.set(attr);
     155         7138 :             if (!myExpectedAttrs.test(attr)) {
     156            0 :                 throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
     157              :             }
     158              :         }
     159         7258 :     }
     160              : 
     161              :     template <class ATTR_TYPE, class BUILDER>
     162         8014 :     inline void checkBuilder(const ATTR_TYPE& attr, const std::shared_ptr<arrow::DataType>& (*dataType)()) {
     163         8014 :         myNeedsWrite = true;
     164         8014 :         if (!myWroteHeader) {
     165         1642 :             const std::string fieldName = getAttrString(toString(attr));
     166         1845 :             for (const auto& field : mySchema->fields()) {
     167         1624 :                 if (field->name() == fieldName) {
     168              :                     return;
     169              :                 }
     170              :             }
     171          884 :             mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(fieldName, dataType()));
     172              :             auto builder = std::make_shared<BUILDER>();
     173          221 :             if (!myBuilders.empty()) {
     174          197 :                 if (myBuilders.back()->length() > 0) {
     175           60 :                     PARQUET_THROW_NOT_OK(builder->AppendNulls(myBuilders.back()->length()));
     176              :                 }
     177          197 :                 while (myValues.size() < myBuilders.size()) {
     178            0 :                     myValues.push_back(nullptr);
     179              :                 }
     180              :             }
     181          442 :             myBuilders.push_back(builder);
     182              :         }
     183              :     }
     184              : 
     185              :     /// @brief the format to use for the column names
     186              :     const std::string myHeaderFormat;
     187              : 
     188              :     /// @brief the compression to use
     189              :     parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
     190              : 
     191              :     /// @brief the number of rows to write per batch
     192              :     const int myBatchSize;
     193              : 
     194              :     /// @brief the currently read tag (only valid when generating the header)
     195              :     std::string myCurrentTag;
     196              : 
     197              :     /// @brief the table schema
     198              :     std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
     199              : 
     200              :     /// @brief the output stream writer
     201              :     std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
     202              : 
     203              :     /// @brief the content array builders for the table
     204              :     std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
     205              : 
     206              :     /// @brief The number of attributes in the currently open XML elements
     207              :     std::vector<int> myXMLStack;
     208              : 
     209              :     /// @brief the current attribute / column values
     210              :     std::vector<std::shared_ptr<arrow::Scalar> > myValues;
     211              : 
     212              :     /// @brief the maximum depth of the XML hierarchy
     213              :     int myMaxDepth = 0;
     214              : 
     215              :     /// @brief whether the schema has been constructed completely
     216              :     bool myWroteHeader = false;
     217              : 
     218              :     /// @brief whether the columns should be checked for completeness
     219              :     bool myCheckColumns = false;
     220              : 
     221              :     /// @brief whether there is still unwritten data
     222              :     bool myNeedsWrite = false;
     223              : 
     224              :     /// @brief the attributes which are expected for a complete row (including null values)
     225              :     SumoXMLAttrMask myExpectedAttrs;
     226              : 
     227              :     /// @brief the attributes already seen (including null values)
     228              :     SumoXMLAttrMask mySeenAttrs;
     229              : };
     230              : 
     231              : 
     232              : // ===========================================================================
     233              : // specialized template implementations
     234              : // ===========================================================================
     235              : template <>
     236         4354 : inline void ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
     237         4354 :     checkAttr(attr);
     238         4354 :     if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
     239         1440 :         checkBuilder<SumoXMLAttr, arrow::DoubleBuilder>(attr, arrow::float64);
     240         2880 :         myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
     241              :     } else {
     242         2914 :         checkBuilder<SumoXMLAttr, arrow::FloatBuilder>(attr, arrow::float32);
     243         5828 :         myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
     244              :     }
     245         4354 : }
     246              : 
     247              : template <>
     248            0 : inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
     249            0 :     checkAttr(attr);
     250            0 :     checkBuilder<SumoXMLAttr, arrow::Int32Builder>(attr, arrow::int32);
     251            0 :     myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
     252            0 : }
     253              : 
     254              : template <>
     255            0 : inline void ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val) {
     256              :     assert(!myCheckColumns);
     257            0 :     if (into.precision() > 2) {
     258            0 :         checkBuilder<std::string, arrow::DoubleBuilder>(attr, arrow::float64);
     259            0 :         myValues.push_back(std::make_shared<arrow::DoubleScalar>(val));
     260              :     } else {
     261            0 :         checkBuilder<std::string, arrow::FloatBuilder>(attr, arrow::float32);
     262            0 :         myValues.push_back(std::make_shared<arrow::FloatScalar>((float)val));
     263              :     }
     264            0 : }
     265              : 
     266              : template <>
     267            0 : inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val) {
     268              :     assert(!myCheckColumns);
     269            0 :     checkBuilder<std::string, arrow::Int32Builder>(attr, arrow::int32);
     270            0 :     myValues.push_back(std::make_shared<arrow::Int32Scalar>(val));
     271            0 : }
        

Generated by: LCOV version 2.0-1