LCOV - code coverage report
Current view: top level - src/utils/iodevices - ParquetFormatter.h (source / functions) Coverage Total Hit
Test: lcov.info Lines: 52.6 % 78 41
Test Date: 2025-12-06 15:35:27 Functions: 10.7 % 84 9

            Line data    Source code
       1              : /****************************************************************************/
       2              : // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
       3              : // Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
       4              : // This program and the accompanying materials are made available under the
       5              : // terms of the Eclipse Public License 2.0 which is available at
       6              : // https://www.eclipse.org/legal/epl-2.0/
       7              : // This Source Code may also be made available under the following Secondary
       8              : // Licenses when the conditions for such availability set forth in the Eclipse
       9              : // Public License 2.0 are satisfied: GNU General Public License, version 2
      10              : // or later which is available at
      11              : // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
      12              : // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
      13              : /****************************************************************************/
      14              : /// @file    ParquetFormatter.h
      15              : /// @author  Michael Behrisch
      16              : /// @date    2025-06-17
      17              : ///
      18              : // Output formatter for Parquet output
      19              : /****************************************************************************/
      20              : #pragma once
      21              : #include <config.h>
      22              : 
      23              : #include <ostream>
      24              : 
      25              : #ifdef _MSC_VER
      26              : /* Disable warning about unmatched push / pop.
      27              :    TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
      28              : #pragma warning(suppress: 5032)
      29              : #pragma warning(push)
      30              : /* Disable warning about unused parameters */
      31              : #pragma warning(disable: 4100)
      32              : /* Disable warning about hidden function arrow::io::Writable::Write */
      33              : #pragma warning(disable: 4266)
      34              : /* Disable warning about padded memory layout */
      35              : #pragma warning(disable: 4324)
      36              : /* Disable warning about this in initializers */
      37              : #pragma warning(disable: 4355)
      38              : /* Disable warning about changed memory layout due to virtual base class */
      39              : #pragma warning(disable: 4435)
      40              : /* Disable warning about declaration hiding class member */
      41              : #pragma warning(disable: 4458)
      42              : /* Disable warning about implicit conversion of int to bool */
      43              : #pragma warning(disable: 4800)
      44              : #endif
      45              : #include <arrow/api.h>
      46              : #include <parquet/arrow/writer.h>
      47              : #ifdef _MSC_VER
      48              : /* Disable warning about unmatched push / pop.
      49              :    TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
      50              : #pragma warning(suppress: 5031)
      51              : #pragma warning(pop)
      52              : #endif
      53              : 
      54              : #include <utils/common/ToString.h>
      55              : #include "OutputFormatter.h"
      56              : 
      57              : 
      58              : // ===========================================================================
      59              : // class definitions
      60              : // ===========================================================================
      61              : /**
      62              :  * @class ParquetFormatter
      63              :  * @brief Output formatter for Parquet output
      64              :  */
      65              : class ParquetFormatter : public OutputFormatter {
      66              : public:
      67              :     /// @brief Constructor
      68              :     // for some motivation on the default batch size see https://stackoverflow.com/questions/76782018/what-is-actually-meant-when-referring-to-parquet-row-group-size
      69              :     ParquetFormatter(const std::string& columnNames, const std::string& compression = "", const int batchSize = 1000000);
      70              : 
      71              :     /// @brief Destructor
      72           72 :     virtual ~ParquetFormatter() { }
      73              : 
      74              :     /** @brief Keeps track of an open XML tag by adding a new element to the stack
      75              :      *
      76              :      * @param[in] into The output stream to use (unused)
      77              :      * @param[in] xmlElement Name of element to open (unused)
      78              :      * @return The OutputDevice for further processing
      79              :      */
      80              :     void openTag(std::ostream& into, const std::string& xmlElement);
      81              : 
      82              :     /** @brief Keeps track of an open XML tag by adding a new element to the stack
      83              :      *
      84              :      * @param[in] into The output stream to use (unused)
      85              :      * @param[in] xmlElement Name of element to open (unused)
      86              :      */
      87              :     void openTag(std::ostream& into, const SumoXMLTag& xmlElement);
      88              : 
      89              :     /** @brief Closes the most recently opened tag
      90              :      *
      91              :      * @param[in] into The output stream to use
      92              :      * @return Whether a further element existed in the stack and could be closed
      93              :      * @todo it is not verified that the topmost element was closed
      94              :      */
      95              :     bool closeTag(std::ostream& into, const std::string& comment = "");
      96              : 
      97              :     /** @brief writes a named attribute
      98              :      *
      99              :      * @param[in] attr The attribute (name)
     100              :      * @param[in] val The attribute value
     101              :      * @param[in] isNull The given value is not set
     102              :      */
     103              :     template <class T>
     104         2904 :     void writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const T& val, const bool isNull = false) {
     105         2904 :         checkAttr(attr);
     106         2904 :         if (!myWroteHeader) {
     107          312 :             mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::utf8()));
     108          156 :             myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
     109              :         }
     110         5808 :         myValues.push_back(isNull ? nullptr : std::make_shared<arrow::StringScalar>(toString(val)));
     111         2904 :     }
     112              : 
     113              :     template <class T>
     114            0 :     void writeAttr(std::ostream& /* into */, const std::string& attr, const T& val) {
     115              :         assert(!myCheckColumns);
     116            0 :         if (!myWroteHeader) {
     117            0 :             mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::utf8()));
     118            0 :             myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
     119              :         }
     120            0 :         myValues.push_back(std::make_shared<arrow::StringScalar>(toString(val)));
     121            0 :     }
     122              : 
     123          756 :     void writeTime(std::ostream& into, const SumoXMLAttr attr, const SUMOTime val) {
     124          756 :         if (!gHumanReadableTime) {
     125          756 :             if (!myWroteHeader) {
     126         2496 :                 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
     127         1248 :                 myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
     128              :             }
     129          756 :             myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
     130          756 :             return;
     131              :         }
     132            0 :         writeAttr(into, attr, time2string(val));
     133              :     }
     134              : 
     135            0 :     bool wroteHeader() const {
     136            0 :         return myWroteHeader;
     137              :     }
     138              : 
     139           18 :     void setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth = 2) {
     140           18 :         myExpectedAttrs = expected;
     141           18 :         myMaxDepth = depth;
     142           18 :         myCheckColumns = expected.any();
     143           18 :     }
     144              : 
     145              : private:
     146          821 :     inline const std::string getAttrString(const std::string& attrString) {
     147          821 :         if (myHeaderFormat == "plain") {
     148              :             return attrString;
     149              :         }
     150          821 :         if (myHeaderFormat == "auto") {
     151            0 :             for (const auto& field : mySchema->fields()) {
     152            0 :                 if (field->name() == attrString) {
     153            0 :                     return myCurrentTag + "_" + attrString;
     154              :                 }
     155              :             }
     156              :             return attrString;
     157              :         }
     158         1642 :         return myCurrentTag + "_" + attrString;
     159              :     }
     160              : 
     161         7258 :     inline void checkAttr(const SumoXMLAttr attr) {
     162         7258 :         if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
     163         7138 :             mySeenAttrs.set(attr);
     164         7138 :             if (!myExpectedAttrs.test(attr)) {
     165            0 :                 throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
     166              :             }
     167              :         }
     168         7258 :     }
     169              : 
     170              :     /// @brief the format to use for the column names
     171              :     const std::string myHeaderFormat;
     172              : 
     173              :     /// @brief the compression to use
     174              :     parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
     175              : 
     176              :     /// @brief the number of rows to write per batch
     177              :     const int myBatchSize;
     178              : 
     179              :     /// @brief the currently read tag (only valid when generating the header)
     180              :     std::string myCurrentTag;
     181              : 
     182              :     /// @brief the table schema
     183              :     std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
     184              : 
     185              :     /// @brief the output stream writer
     186              :     std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
     187              : 
     188              :     /// @brief the content array builders for the table
     189              :     std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
     190              : 
     191              :     /// @brief The number of attributes in the currently open XML elements
     192              :     std::vector<int> myXMLStack;
     193              : 
     194              :     /// @brief the current attribute / column values
     195              :     std::vector<std::shared_ptr<arrow::Scalar> > myValues;
     196              : 
     197              :     /// @brief the maximum depth of the XML hierarchy
     198              :     int myMaxDepth = 0;
     199              : 
     200              :     /// @brief whether the schema has been constructed completely
     201              :     bool myWroteHeader = false;
     202              : 
     203              :     /// @brief whether the columns should be checked for completeness
     204              :     bool myCheckColumns = false;
     205              : 
     206              :     /// @brief the attributes which are expected for a complete row (including null values)
     207              :     SumoXMLAttrMask myExpectedAttrs;
     208              : 
     209              :     /// @brief the attributes already seen (including null values)
     210              :     SumoXMLAttrMask mySeenAttrs;
     211              : };
     212              : 
     213              : 
     214              : // ===========================================================================
     215              : // specialized template implementations
     216              : // ===========================================================================
     217              : template <>
     218         4354 : inline void ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
     219         4354 :     checkAttr(attr);
     220         4354 :     if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
     221         1440 :         if (!myWroteHeader) {
     222          144 :             mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
     223           72 :             myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
     224              :         }
     225         2880 :         myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
     226              :     } else {
     227         2914 :         if (!myWroteHeader) {
     228          332 :             mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float32()));
     229          166 :             myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
     230              :         }
     231         5828 :         myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
     232              :     }
     233         4354 : }
     234              : 
     235              : template <>
     236            0 : inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
     237            0 :     checkAttr(attr);
     238            0 :     if (!myWroteHeader) {
     239            0 :         mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::int32()));
     240            0 :         myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
     241              :     }
     242            0 :     myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
     243            0 : }
     244              : 
     245              : template <>
     246            0 : inline void ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val) {
     247              :     assert(!myCheckColumns);
     248            0 :     if (into.precision() > 2) {
     249            0 :         if (!myWroteHeader) {
     250            0 :             mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float64()));
     251            0 :             myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
     252              :         }
     253            0 :         myValues.push_back(std::make_shared<arrow::DoubleScalar>(val));
     254              :     } else {
     255            0 :         if (!myWroteHeader) {
     256            0 :             mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float32()));
     257            0 :             myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
     258              :         }
     259            0 :         myValues.push_back(std::make_shared<arrow::FloatScalar>((float)val));
     260              :     }
     261            0 : }
     262              : 
     263              : template <>
     264            0 : inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val) {
     265              :     assert(!myCheckColumns);
     266            0 :     if (!myWroteHeader) {
     267            0 :         mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::int32()));
     268            0 :         myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
     269              :     }
     270            0 :     myValues.push_back(std::make_shared<arrow::Int32Scalar>(val));
     271            0 : }
        

Generated by: LCOV version 2.0-1