0%

Details you need to know about Apache Parquet

Parquet is a columnar file format that supports nested data. Lots of data systems support this data format because of it’s great advantage of performance.

File format

parquet-file-format

First we should known is that Apache Parquet is a binary encoding like Apache Thrift and Protocol Buffers which are not human-redable, it’s very different from some texual format like JSON, XML and CSV.

In order to identify the beginning and ending of the Parquet file, it use a Magic Number(4 special bytes) as separator. Following the first magic number, there are several Row Groups and then Footer. FileMetaData is placed in Footer, because metadata is written after the data is written. Row Groups are about datas.

There are three types of metadata: file metadata, column (chunk) metadata and page header metadata. All thrift structures are serialized using the TCompactProtocol.

ThriftCompactProtocol is another binary encoding from Apache Thrift project.

Some important conceptions listed below.

Block (hdfs block): This means a block in hdfs and the meaning is unchanged for describing this file format. The file format is designed to work well on top of hdfs.

File: A hdfs file that must include the metadata for the file. It does not need to actually contain the data.

Row group: A logical horizontal partitioning of the data into rows. There is no physical structure that is guaranteed for a row group. A row group consists of a column chunk for each column in the dataset.

Column chunk: A chunk of the data for a particular column. These live in a particular row group and is guaranteed to be contiguous in the file.

Page: Column chunks are divided up into pages. A page is conceptually an indivisible unit (in terms of compression and encoding). There can be multiple page types which is interleaved in a column chunk.

Hierarchically, a file consists of one or more row groups. A row group contains exactly one column chunk per column. Column chunks contain one or more pages.

Metadata

Let’s deep into the metadata.

metadata

The definition file: Parquet thrift definition

FileMetaData

Here is the definition:

/**
* Description for file metadata
*/
struct FileMetaData {
/** Version of this file **/
1: required i32 version

/** Parquet schema for this file. This schema contains metadata for all the columns.
* The schema is represented as a tree with a single root. The nodes of the tree
* are flattened to a list by doing a depth-first traversal.
* The column metadata contains the path in the schema for that column which can be
* used to map columns to nodes in the schema.
* The first element is the root **/
2: required list<SchemaElement> schema;

/** Number of rows in this file **/
3: required i64 num_rows

/** Row groups in this file **/
4: required list<RowGroup> row_groups

/** Optional key/value metadata **/
5: optional list<KeyValue> key_value_metadata

/** String for application that wrote this file. This should be in the format
* <Application> version <App Version> (build <App Build Hash>).
* e.g. impala version 1.0 (build 6cf94d29b2b7115df4de2c06e2ab4326d721eb55)
**/
6: optional string created_by

/**
* Sort order used for the min_value and max_value fields of each column in
* this file. Each sort order corresponds to one column, determined by its
* position in the list, matching the position of the column in the schema.
*
* Without column_orders, the meaning of the min_value and max_value fields is
* undefined. To ensure well-defined behaviour, if min_value and max_value are
* written to a Parquet file, column_orders must be written as well.
*
* The obsolete min and max fields are always sorted by signed comparison
* regardless of column_orders.
*/
7: optional list<ColumnOrder> column_orders;
}

The field ‘num_rows’ is very useful when data reader wanna to count the data, for instance, when SparkSQL count on some paritioned table, Spark just sum all the ‘num_rows’ of each parquet file belong to those filtered partitions.

The ‘schema’ is the most important part of this metadata, it is defined by a list of SchemaElement, that’s means each field is represented by a SchemaElement.

SchemaElement

Here is the structure of SchemaElement.

/**
* Represents a element inside a schema definition.
* - if it is a group (inner node) then type is undefined and num_children is defined
* - if it is a primitive type (leaf) then type is defined and num_children is undefined
* the nodes are listed in depth first traversal order.
*/
struct SchemaElement {
/** Data type for this field. Not set if the current element is a non-leaf node */
1: optional Type type;

/** If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the vales.
* Otherwise, if specified, this is the maximum bit length to store any of the values.
* (e.g. a low cardinality INT col could have this set to 3). Note that this is
* in the schema, and therefore fixed for the entire file.
*/
2: optional i32 type_length;

/** repetition of the field. The root of the schema does not have a repetition_type.
* All other nodes must have one */
3: optional FieldRepetitionType repetition_type;

/** Name of the field in the schema */
4: required string name;

/** Nested fields. Since thrift does not support nested fields,
* the nesting is flattened to a single list by a depth-first traversal.
* The children count is used to construct the nested relationship.
* This field is not set when the element is a primitive type
*/
5: optional i32 num_children;

/** When the schema is the result of a conversion from another model
* Used to record the original type to help with cross conversion.
*/
6: optional ConvertedType converted_type;

/** Used when this column contains decimal data.
* See the DECIMAL converted type for more details.
*/
7: optional i32 scale
8: optional i32 precision

/** When the original schema supports field ids, this will save the
* original field id in the parquet schema
*/
9: optional i32 field_id;

/**
* The logical type of this SchemaElement
*
* LogicalType replaces ConvertedType, but ConvertedType is still required
* for some logical types to ensure forward-compatibility in format v1.
*/
10: optional LogicalType logicalType
}

The data type of each data column is determined by both ‘type’ and ‘logicalType’ field.

The ‘type’ field support several primitive types:

BOOLEAN: 1 bit boolean
INT32: 32 bit signed ints
INT64: 64 bit signed ints
INT96: 96 bit signed ints
FLOAT: IEEE 32-bit floating point values
DOUBLE: IEEE 64-bit floating point values
BYTE_ARRAY: arbitrarily long byte arrays

Logical types are some type based on primitive type, the field ‘logicalType’ tells data reader to which LogicalType should the primitive type data be transfered.

Logical types are used to extend the types that parquet can be used to store, by specifying how the primitive types should be interpreted. This keeps the set of primitive types to a minimum and reuses parquet’s efficient encodings. For example, strings are stored as byte arrays (binary) with a UTF8 annotation.

Many types supported by ‘logicalType’. Such as:

STRING,ENUM,UUID,
DATE,TIME,TIMESTAMP,INTERVAL
INT,DECIMAL
JSON,BSON
LIST,MAP
NULL

INT annotation can be used to specify the maximum number of bits in the stored value. The annotation has two parameter: bit width and sign, such as:INT(8, true), INT(16, true), INT(32, true), INT(64, true).

For more details about LogicalType see Parquet Logical Type Definitions

ColumnMetaData

/**
* Description for column metadata
*/
struct ColumnMetaData {
/** Type of this column **/
1: required Type type

/** Set of all encodings used for this column. The purpose is to validate
* whether we can decode those pages. **/
2: required list<Encoding> encodings

/** Path in schema **/
3: required list<string> path_in_schema

/** Compression codec **/
4: required CompressionCodec codec

/** Number of values in this column **/
5: required i64 num_values

/** total byte size of all uncompressed pages in this column chunk (including the headers) **/
6: required i64 total_uncompressed_size

/** total byte size of all compressed pages in this column chunk (including the headers) **/
7: required i64 total_compressed_size

/** Optional key/value metadata **/
8: optional list<KeyValue> key_value_metadata

/** Byte offset from beginning of file to first data page **/
9: required i64 data_page_offset

/** Byte offset from beginning of file to root index page **/
10: optional i64 index_page_offset

/** Byte offset from the beginning of file to first (only) dictionary page **/
11: optional i64 dictionary_page_offset

/** optional statistics for this column chunk */
12: optional Statistics statistics;

/** Set of all encodings used for pages in this column chunk.
* This information can be used to determine if all data pages are
* dictionary encoded for example **/
13: optional list<PageEncodingStats> encoding_stats;
}

Each ColumnChunk has only one ColumnMetaData, so one ColumnMetaData defined how to store one column’s data in one row group.

Some fields should be noticed:

  • encodings: just for validation, each column’s encoding is defined in DataPageHeader(see below);
  • codec: the compression algorithm used, such as SNAPPY, GZIP, LZO and so on;
  • statistics: statistics information for the column of the row group. Some useful field are showed below, they are very useful for distinct counting or filtering. With the ‘max_value’ and ‘min_value’, SparkSQL can do filter push-down by skipping some row groups.
struct Statistics {
...
/** count of null value in the column */
3: optional i64 null_count;
/** count of distinct values occurring */
4: optional i64 distinct_count;
/**
* Min and max values for the column, determined by its ColumnOrder.
*
* Values are encoded using PLAIN encoding, except that variable-length byte
* arrays do not include a length prefix.
*/
5: optional binary max_value;
6: optional binary min_value;
}

PageHeader is kind of like parent class of DataPageHeader, IndexPageHeader and DictionaryPageHeader, contains some common fields.

Each data page has a DataPageHeader, let’s look into it below.

DataPageHeader
/**
* New page format allowing reading levels without decompressing the data
* Repetition and definition levels are uncompressed
* The remaining section containing the data is compressed if is_compressed is true
**/
struct DataPageHeaderV2 {
/** Number of values, including NULLs, in this data page. **/
1: required i32 num_values
/** Number of NULL values, in this data page.
Number of non-null = num_values - num_nulls which is also the number of values in the data section **/
2: required i32 num_nulls
/** Number of rows in this data page. which means pages change on record boundaries (r = 0) **/
3: required i32 num_rows
/** Encoding used for data in this page **/
4: required Encoding encoding

// repetition levels and definition levels are always using RLE (without size in it)

/** length of the definition levels */
5: required i32 definition_levels_byte_length;
/** length of the repetition levels */
6: required i32 repetition_levels_byte_length;

/** whether the values are compressed.
Which means the section of the page between
definition_levels_byte_length + repetition_levels_byte_length + 1 and compressed_page_size (included)
is compressed with the compression_codec.
If missing it is considered compressed */
7: optional bool is_compressed = 1;

/** optional statistics for this column chunk */
8: optional Statistics statistics;
}
  • encoding: This field is about how to store primitive type data with bytes, for example, a PLAIN encoding means to use 4 bytes, PLAIN_DICTIONARY means to use a dictionary.
    For more details see Parquet encoding definitions
  • statistics: statistics information for this page

ps: The comment of statistics is wrong, it should be ‘optional statistics for the data in this page’, I have fixed that in my PR.

How encoding work with compression ?

First, the data be encoded , second, encoded output is then compressed with a generic compression algorithm specified in ColumnMetaData like Snappy.

There is another description.

Encoding: It’s more at application level where data representation is changed. The encoding can also minimize space usage which can give us a kind of compression.
Compression : In general it’s the Technic to reduce storage for given data in bytes irrespective of underline data is already encoded or not.

Parquet-tools

Parquet-tools is a convenient offical tool to play with Parquet file.

If you use MacOS and homebrew, just install it by brew install parquet-tools .

To show metadata: parquet-tools meta yourfile.parquet

Or use hadoop distribution: hadoop jar ./parquet-tools-<VERSION>.jar <command>

You’ll get something like this:

creator:     parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"v","type":"i [more]...

file schema: spark_schema
-------------------------------------------------------------------------------------------------------------------
v: REQUIRED INT32 R:0 D:0
sq: REQUIRED INT32 R:0 D:0
str: OPTIONAL BINARY O:UTF8 R:0 D:1

row group 1: RC:5000 TS:94040
-------------------------------------------------------------------------------------------------------------------
v: INT32 SNAPPY DO:0 FPO:4 SZ:20056/20050/1.00 VC:5000 ENC:PLAIN,BIT_PACKED
sq: INT32 SNAPPY DO:0 FPO:20060 SZ:20030/20050/1.00 VC:5000 ENC:PLAIN,BIT_PACKED
str: BINARY SNAPPY DO:0 FPO:40090 SZ:21960/53940/2.46 VC:5000 ENC:RLE,PLAIN,BIT_PACKED

More details: Parquet Tools

Use Parquet with Spark

You can find some guide here: Spark SQL Guide > Parquet Files.

With Parquet file, spark can do some optimizations.

  • Column projection
    The idea behind this feature is simple: just read the data for columns that the query needs to process and skip the rest of the data. Column-oriented data formats like Parquet can implement this feature quite naturally.
  • Predicate push down
    Predicate push down is another feature of Spark and Parquet that can improve query performance by reducing the amount of data read from Parquet files. Predicate push down works by evaluating filtering predicates in the query against metadata stored in the Parquet files.

Does predicate push down available for nest field ?

Yes. If the field schema is user(id, name), data will be stored in two columns as user.id and user.name, then both id and name have thier own statistic info in metadata.

Use Parquet with HDFS

When using Parquet with HDFS, you should care about the row group size.

Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.

The row group size is controlled by config named parquet.block.size, the default value is 128MB,

You can set it in Spark as below:
sc.hadoopConfiguration.setInt("parquet.block.size",256*1024*1024)
or
df.write.option("parquet.block.size",256*1024*1024)

Reference

Apache Parquet

Diving into Spark and Parquet Workloads, by Example