Skip to content

11 — OLAP and Columnar Storage

Technical Overview

Online Analytical Processing (OLAP) databases are designed for aggregate queries that scan millions of rows across few columns. Unlike OLTP (few rows, many columns), OLAP workloads are read-heavy, batch-oriented, and require sustained high-throughput sequential I/O. These access patterns are a fundamentally different problem than OLTP, motivating a different storage layout (columnar), a different execution model (vectorized), and a different architecture (warehouse-centric or in-process analytical engine).

The columnar storage format — pioneered by systems like MonetDB, C-Store (Stonebraker et al., 2005), and commercialized by Vertica, Snowflake, and ClickHouse — has become the dominant approach for analytical workloads. Apache Parquet and Apache Arrow have standardized the on-disk and in-memory columnar formats respectively, enabling an ecosystem of interoperable analytical tools. DuckDB, ClickHouse, and Snowflake represent three different deployment models (in-process, self-hosted, and cloud-native) that all leverage columnar storage.

Prerequisites

  • Understanding of row-store storage (heap pages, B+ trees)
  • Familiarity with compression algorithms (RLE, dictionary coding)
  • Basic knowledge of SIMD instructions and CPU cache behavior
  • Understanding of query execution operators (scan, filter, aggregate, join)

Core Content

OLAP vs OLTP Access Patterns

OLTP Workload:
  Query: SELECT name, email FROM users WHERE user_id = 42;
  Access pattern:
    - 1-10 rows
    - All columns for selected rows
    - Random access by primary key
    - High QPS (thousands per second)

  Optimal layout: ROW STORE
  +-----+---------+-------+------+--------+
  | id=1| name=Al | email | age | salary |
  +-----+---------+-------+------+--------+
  | id=2| name=Bo | email | age | salary |
  +-----+---------+-------+------+--------+
  (fetch one row = one page fetch, all columns available)

OLAP Workload:
  Query: SELECT country, AVG(revenue) FROM sales WHERE year=2023 GROUP BY country;
  Access pattern:
    - 100M rows scanned
    - 2 columns out of 50 (country, revenue)
    - Sequential scan
    - Low QPS (tens per hour), high scan volume

  Optimal layout: COLUMN STORE
  country col: [US][US][UK][DE][US][UK][DE]... (100M values, contiguous)
  revenue col: [100][200][150][75][300]...    (100M values, contiguous)
  (scan only 2 columns, skip 48 columns entirely)

I/O reduction: A 50-column table with 100-byte rows = 5GB per 50M rows. If the query needs 2 columns totaling 10 bytes per row = 500MB. Column store reads 500MB; row store reads 5GB — 10x difference in I/O, directly proportional to performance.

Column vs Row Layout in Memory/Storage

Row Store (PostgreSQL heap page):
+----------------------------------------------+
| row1: [id][name][email][age][salary][country] |
| row2: [id][name][email][age][salary][country] |
| row3: [id][name][email][age][salary][country] |
+----------------------------------------------+
Sequential scan reads all column data even if only one column needed.

Column Store (Parquet row group):
+------------------+
| id chunk:        |  [1][2][3][4][5]...
| name chunk:      |  [Al][Bo][Ca]...
| email chunk:     |  [a@b][b@c][c@d]...
| age chunk:       |  [30][25][35]...
| salary chunk:    |  [50K][60K][70K]...
| country chunk:   |  [US][UK][DE]...
+------------------+
Query on age+country: only reads age and country chunks.

Column Compression

Homogeneous column data compresses dramatically better than mixed row data:

Run-Length Encoding (RLE):

Raw: [US][US][US][US][UK][UK][DE][DE][DE][DE][DE]
RLE: [(US,4), (UK,2), (DE,5)]
Compression: 11 values → 3 pairs (7x reduction)

Ideal for sorted, low-cardinality columns (country, status, category). ClickHouse's MergeTree stores data sorted by primary key, making RLE extremely effective for leading sort columns.

Dictionary Encoding:

Raw: ["United States"]["United Kingdom"]["Germany"]["United States"]...
Dict: {0:"United States", 1:"United Kingdom", 2:"Germany"}
Encoded: [0][1][2][0]...

Reduces string column storage by the average string length / 2 bytes (for an 8-bit dictionary). Parquet uses dictionary encoding by default; ClickHouse's LowCardinality type uses it.

Bit-Packing:

Values: [0, 1, 2, 3, 4, 5, 6, 7] (max=7, needs 3 bits)
Normal:  [8 bytes for 8 int32 values = 32 bytes]
Packed:  [3 bits * 8 values = 24 bits = 3 bytes]

For low-range integer columns (e.g., star ratings 1-5 need 3 bits, not 32). Apache Parquet and DuckDB use bit-packing extensively.

Delta Encoding:

Timestamps: [1700000000, 1700000001, 1700000002, 1700000003]
Deltas:     [1700000000, +1, +1, +1]

Excellent for monotonically increasing timestamps. The deltas are then bit-packed (all deltas are 1, needing 0 bits after the first value). Combined with RLE: [(1700000000, delta=1, count=4)].

Cascading compression: Parquet and ClickHouse use multiple encodings in sequence. A timestamp column might be delta-encoded, then bit-packed, then Snappy/Zstd compressed. Total compression ratios of 10-50x are common for analytical workloads.

Vectorized Query Execution

Traditional tuple-at-a-time execution (Volcano model) processes one row per getNext() call. For analytical queries scanning millions of rows, the function call overhead and poor CPU cache utilization are significant.

Vectorized execution processes a batch (vector) of 1024-8192 rows per operator call:

Vectorized Filter:
  Input vector:  [50, 30, 70, 45, 25, 60, 35, 80, ...]  (1024 salary values)
  Predicate:     salary > 40000
  Selection vector (bitmask): [1, 0, 1, 1, 0, 1, 0, 1, ...]

  SIMD implementation (AVX2, 256-bit registers):
    Load 8 int32 values into ymm0
    Compare with [40000,40000,40000,40000,40000,40000,40000,40000] -> ymm1 (bitmask)
    Store bitmask
  8 comparisons per SIMD instruction vs 8 function calls in Volcano model.

DuckDB uses vectorized execution throughout. ClickHouse uses column-at-a-time processing (similar but processes entire columns rather than batches). The key insight: operating on homogeneous arrays of values allows SIMD (Single Instruction, Multiple Data) CPU instructions to process 4-16 values per instruction.

Late materialization: In columnar execution, project only the columns needed for filters first. Only fetch full rows (or the projected column set) after filtering. This reduces the volume of data processed by downstream operators.

Apache Parquet Format

Parquet is the dominant columnar on-disk format for analytical workloads (Hadoop, Spark, Presto, Snowflake, BigQuery, DuckDB all read/write Parquet).

Parquet File Structure:
+------------------------------------------+
| Magic: "PAR1"                            |
+------------------------------------------+
| Row Group 0 (e.g., 128MB)               |
|   Column Chunk (id):                     |
|     Page 0 (data, 1MB): [dictionary][data]|
|     Page 1 (data, 1MB): [data]           |
|     ...                                  |
|   Column Chunk (name):                   |
|     Page 0: [dictionary][data]           |
|     ...                                  |
|   Column Chunk (salary):                 |
|     ...                                  |
+------------------------------------------+
| Row Group 1 (e.g., 128MB)               |
|   ...                                    |
+------------------------------------------+
| Footer (column statistics, schema,       |
|   row group offsets, key-value metadata) |
+------------------------------------------+
| Footer Length (4 bytes)                  |
| Magic: "PAR1"                            |
+------------------------------------------+

Column statistics in footer: Each column chunk stores min/max values and null count. This enables predicate pushdown: WHERE salary > 100000 can skip entire row groups if row_group.salary.max < 100000.

Row group size: Default 128MB. Larger row groups improve compression (more data for dictionary to deduplicate) but require more memory for writing.

Apache Arrow (In-Memory Columnar)

Arrow is the in-memory counterpart of Parquet — a standard in-memory columnar layout for efficient data exchange between analytical systems.

Arrow Record Batch (in-memory):
  Schema: {id: int32, name: utf8, salary: int32}
  Buffers:
    id.data:        [1, 2, 3, 4, 5, ...]     int32 array, 4 bytes/value
    name.offsets:   [0, 5, 8, 13, ...]        offsets into values buffer
    name.data:      "Alice""Bob""Carol"...    variable-length string data
    salary.data:    [50000, 75000, 90000, ...]

Arrow enables zero-copy data exchange: since Arrow defines a standard memory layout, data can be shared between processes (via shared memory), languages (Python↔Java↔C++), and systems (Pandas↔Spark↔DuckDB) without serialization/deserialization. This is the key advantage over Parquet (which requires encoding/decoding) for in-memory inter-process communication.

The Arrow Flight protocol uses Arrow over gRPC for high-throughput inter-process data transfer (replacing JDBC/ODBC).

DuckDB Architecture

DuckDB is an in-process analytical SQL database (similar positioning to SQLite, but for OLAP). No separate server process; the database is a library linked into the application.

DuckDB Integration:
  Python application
    |
    import duckdb
    |
    duckdb.execute("SELECT country, SUM(revenue) FROM 'data.parquet' GROUP BY 1")
    |
    -> DuckDB reads Parquet directly from filesystem
    -> Executes with vectorized operators in-process
    -> Returns Arrow Table or Python list
    No network, no serialization, no server process.

DuckDB's execution engine uses: - Columnar vectorized execution: Operations on 2048-row batches - Push-based pipelining: Operators push batches to their consumers (vs Volcano's pull) - Parallel execution: Morsel-driven parallelism (Leis et al., 2014) where work is divided into "morsels" dynamically assigned to threads - Adaptive aggregation: HashAggregation with spill to disk for large groups

DuckDB reads Parquet, CSV, JSON, Arrow, and PostgreSQL wire protocol directly. It can execute SQL queries directly on Parquet files without loading them into the database.

ClickHouse Architecture

ClickHouse is a self-hosted (or cloud) columnar DBMS designed for high-throughput INSERT and query workloads. Used for real-time analytics, observability pipelines, and ad-tech.

MergeTree family: ClickHouse's primary table engine. Data is stored in "data parts" — sorted columnar files created on each INSERT batch.

MergeTree Storage Structure:
  INSERT batch 1 -> Part 1:  [sorted rows, column files, primary index]
  INSERT batch 2 -> Part 2:  [sorted rows, column files, primary index]
  INSERT batch 3 -> Part 3:  [sorted rows, column files, primary index]
  ...
  Background merge: Part 1 + Part 2 -> Part 1_2 (merged, sorted)

Data Part Directory:
  part_1/
    id.bin          <- id column data (compressed)
    id.mrk          <- id column marks (sparse index granule pointers)
    name.bin
    name.mrk
    salary.bin
    salary.mrk
    primary.idx     <- primary key sparse index
    count.txt       <- row count in part

Sparse Primary Index: ClickHouse stores one primary key value per "granule" (8192 rows by default) in the primary index file. This provides a lightweight, approximate index — the query must scan entire granules, but can skip large ranges of granules.

Primary Index (ORDER BY date, user_id):
  Row 0:       date=2024-01-01, user_id=100
  Row 8192:    date=2024-01-02, user_id=50
  Row 16384:   date=2024-01-03, user_id=200
  ...

Query: WHERE date = '2024-01-02'
  Binary search in primary index: granule 8192-16384 is relevant
  Scan only that granule range (skip all others)

Data Skipping Indexes (ClickHouse extension): Secondary indexes stored per granule: - minmax: stores min/max per granule (like BRIN) - set(N): stores the set of distinct values per granule - bloom_filter: stores a bloom filter per granule for approximate lookup - ngrambf_v1: bloom filter on n-grams for text search

CREATE TABLE events (
    date Date,
    user_id UInt64,
    event_type String,
    INDEX idx_event_type event_type TYPE set(100) GRANULARITY 4
) ENGINE = MergeTree()
ORDER BY (date, user_id);

Snowflake Architecture

Snowflake is a cloud-native OLAP data warehouse with complete separation of storage, compute, and metadata.

Snowflake Architecture:

  Virtual Warehouses (compute):
    +-------+  +-------+  +-------+
    | VW-S  |  | VW-M  |  | VW-L  |  <- Multiple independent compute clusters
    | (T-shirt|  (different|  (different|
    |  sized) |   queries) |   queries) |
    +-------+  +-------+  +-------+
         |          |          |
         v          v          v
    Cloud Storage (S3/GCS/Azure Blob)
    +--------------------------------------------------+
    | Micro-partitions (columnar, compressed, 50-500MB)|
    | Automatically clustered, immutable               |
    +--------------------------------------------------+
         ^
         |
    Cloud Services Layer (metadata, query planning, access control)
    +--------------------------------------------------+
    | Global metadata catalog                          |
    | Optimizer (cost-based, statistics from metadata) |
    | Security, auth, governance                       |
    +--------------------------------------------------+

Micro-partitions: Snowflake stores data in immutable 50-500MB columnar files (called micro-partitions) in S3. Each micro-partition contains ~100K-1M rows with per-column min/max statistics stored in the metadata service. WHERE date > '2024-01-01' prunes micro-partitions by their max date value.

Virtual Warehouses: Each VW is an independent cluster of nodes with its own local SSD cache (the "local disk cache"). VWs do not share cache state — each caches the micro-partitions it has read. VWs can be suspended (no compute cost) and resumed in ~30 seconds.

Result cache: Snowflake caches query results for 24 hours. An identical query (same SQL, same data) returns instantly from the result cache, consuming zero compute credits.

Push-Based vs Pull-Based Execution

Pull (Volcano Model):                Push (DuckDB / HyPer):
  Root calls getNext()                Source pushes batches downstream
  |                                   |
  Operator pulls from child           Operator transforms + passes on
  |                                   |
  Scan fetches next tuple             Sink materializes results

Pull pros: Simple, lazy evaluation    Push pros: Better pipelining, no
Pull cons: Per-tuple function call      back-pressure complexity,
           overhead, poor cache use    SIMD-friendly tight loops

HyPer (Neumann, 2011) popularized push-based execution with LLVM code generation: the entire query pipeline is compiled into a single tight loop with no virtual function calls. DuckDB uses interpreted vectorized push (no JIT compilation, but vectors of 2048 rows per operation).

Historical Context

Column stores trace their origins to TAXIR (Bayer, 1972) and Cantor (1987). The modern columnar revolution began with MonetDB (Peter Boncz, CWI Amsterdam, 1994) and the C-Store paper (Stonebraker et al., 2005). C-Store introduced projections (pre-materialized column groups), column-level compression, and the concept of a "Writeable Store" + "Read-Optimized Store" with tuple movers between them — essentially an LSM tree for columnar data.

Vertica (the commercial C-Store successor), Redshift (Amazon, 2012), and Snowflake (2012) brought columnar warehouses to the cloud era. Apache Parquet (2013, Cloudera + Twitter) standardized the on-disk columnar format. DuckDB (CWI Amsterdam, 2019) brought the analytical engine in-process.

Production Examples

DuckDB: src/execution/operator/ contains vectorized operators. PhysicalHashAggregate and PhysicalHashJoin are the core operators. DuckDB is used by Hugging Face for dataset analytics, Cloudflare for observability queries, and as the SQL engine in many analytics tools (Rill, Evidence, Observable).

ClickHouse: src/Storages/MergeTree/ contains the MergeTree engine. Used by Cloudflare (DNS analytics, 30+ trillion rows), Uber (operational analytics), ByteDance (TikTok analytics), and Yandex (the original creator, 2016).

Snowflake: snowflake-connector-python and the /api/v2/ REST API expose execution. Snowflake reports micro-partition statistics via INFORMATION_SCHEMA.TABLE_STORAGE_METRICS and system$clustering_information().

Debugging Notes

  • ClickHouse query profiling: SET send_logs_level='trace'; SELECT ... dumps detailed execution logs. system.query_log table contains per-query statistics including rows read, bytes read, and CPU time.
  • DuckDB EXPLAIN: EXPLAIN ANALYZE SELECT ... in DuckDB shows vectorized execution plan with timing per operator. The EXPLAIN output uses an arrow (→) to show push direction.
  • Parquet statistics: parquet-cli (pip install parquet-cli) shows row group statistics: parquet meta file.parquet. Verify that min/max statistics are present for predicate pushdown to work.
  • Snowflake query history: SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY ORDER BY execution_time DESC LIMIT 10; shows queries with bytes scanned, percentage scanned from cache.
  • ClickHouse part merges: SELECT * FROM system.merges; shows active merges. High parts_to_merge indicates the merge tree is falling behind; increase background_pool_size.

Security Implications

  • Columnar files in object storage (Snowflake/Redshift): Micro-partitions in S3 are encrypted by the data warehouse, but the S3 bucket permissions must be correctly restricted. Misconfigured S3 bucket ACLs have caused data warehouse exposures.
  • Parquet/Arrow injection: Malicious Parquet files with crafted statistics can cause incorrect predicate pushdown (false negatives — rows that should be returned are not). Schema validation is important when reading untrusted Parquet files.
  • Snowflake virtual warehouse isolation: Different VWs share storage but not compute or cache. Queries on different VWs cannot interfere with each other's execution, providing workload isolation.
  • Column-level encryption: Snowflake supports dynamic data masking (masking policies on columns) and column-level security. This is more granular than row-level security in OLTP systems.

Performance Implications

  • Row group / micro-partition size: Too small → overhead of opening many files; too large → poor predicate pushdown granularity. Parquet's default 128MB row group size is a good starting point. ClickHouse's 8192-row granule (with 50-500MB parts) provides fine-grained skipping.
  • Sort key selection (ClickHouse ORDER BY): The sort key determines the physical sort order and thus the effectiveness of the sparse primary index. Choose the sort key based on the most common query filter columns. For time-series: ORDER BY (date, user_id).
  • Partition pruning: Snowflake's automatic clustering and ClickHouse's PARTITION BY clause enable partition pruning. A query on WHERE year=2024 on a table partitioned by year touches only 2024 partitions.
  • ClickHouse compression: Zstd level 3 provides good compression with fast decompression. LZ4 (default) is fastest. Higher compression levels (Zstd level 9+) reduce I/O but increase CPU cost — beneficial when CPU is faster than I/O.
  • Columnar vs row join: Joining a columnar OLAP table with an OLTP row store table is the challenge in HTAP. TiFlash (TiDB) and SQL Server columnstore indexes provide columnar copies of OLTP data for analytical joins.

Failure Modes

  1. ClickHouse too-many-parts: If batches are inserted too frequently (< 1 second intervals), ClickHouse creates too many small parts. The "too many parts" error appears when the part count exceeds max_parts_in_total. Mitigation: batch INSERTs to at least 1 per second.
  2. Snowflake over-clustering: Manual clustering keys that are frequently updated cause "write amplification" as micro-partitions are reclustered. Use automatic clustering only when query patterns justify the cost.
  3. Parquet predicate pushdown failure: If column statistics are missing or incorrect (e.g., after using pandas.DataFrame.to_parquet() with wrong settings), predicate pushdown fails silently and full scans occur. Verify with Parquet CLI.
  4. DuckDB out-of-memory: DuckDB's vectorized hash join buffers intermediate results in memory. For very large joins exceeding memory, DuckDB spills to disk (since DuckDB 0.7). Configure SET memory_limit='16GB' to prevent OOM.

Modern Usage

DuckDB has fundamentally changed the OLAP tooling landscape. By providing an in-process analytical engine with Parquet native reading, it enables the "local-first analytics" pattern: run analytical queries on Parquet files from a Python notebook without a separate data warehouse.

MotherDuck (cloud DuckDB), Databricks Delta Lake (Parquet-based open table format with ACID), Apache Iceberg (table format), and Apache Hudi represent the modern "open lakehouse" architecture: store data as Parquet/ORC files in object storage, query with DuckDB, Spark, Trino, or Flink with consistent ACID semantics via table format metadata.

Future Directions

  • Liquid clustering (Databricks Delta Lake 3.0): Dynamic re-clustering of Parquet files based on query patterns, without upfront partitioning decisions.
  • Serverless OLAP (ClickHouse Cloud, Snowflake Serverless): Scale compute to zero between queries, pay only for scan volume. Enabled by separating storage (object store) from compute.
  • GPU-accelerated columnar analytics: NVIDIA RAPIDS cuDF provides GPU-based columnar operations at GPU memory bandwidth (~1.5 TB/s vs CPU's ~100 GB/s). Integrated with Spark (Spark-RAPIDS) and Dask.
  • Compact representation of ML features: As ML features (embeddings, time-series signals) are stored in data warehouses, columnar formats are extended to handle fixed-length vector columns efficiently (Parquet's FIXED_LEN_BYTE_ARRAY for embeddings).

Exercises

  1. Create a Parquet file with 10M rows using Python (pyarrow). Compare file size with a CSV of the same data. Query it with DuckDB and observe execution time.
  2. In DuckDB, run EXPLAIN ANALYZE SELECT country, SUM(revenue) FROM data GROUP BY country on a 10M-row Parquet file. Identify which operators are vectorized and how many batches are processed.
  3. Set up ClickHouse locally. Create a MergeTree table with ORDER BY (date, user_id). Insert 100M rows in 1M-row batches. Observe the parts being created and merged via system.parts.
  4. Compare compression ratios: take a 1GB CSV dataset, write it as Parquet with dictionary encoding + Snappy, Parquet with Zstd, and ClickHouse MergeTree. Measure file sizes and query times for a GROUP BY query.
  5. Implement a minimal vectorized filter in C or Python using NumPy: given an array of 1M integers, select those > 50000 using a loop (scalar), NumPy comparison (vectorized), and compare performance.

References

  • Abadi, D. J., et al. (2008). Column-Stores vs. Row-Stores: How Different Are They Really? SIGMOD 2008.
  • Stonebraker, M., et al. (2005). C-Store: A Column-oriented DBMS. VLDB 2005.
  • Boncz, P., Zukowski, M., & Nes, N. (2005). MonetDB/X100: Hyper-Pipelining Query Execution. CIDR 2005.
  • Neumann, T. (2011). Efficiently Compiling Efficient Query Plans for Modern Hardware. PVLDB 2011.
  • Raasveldt, M., & Mühleisen, H. (2019). DuckDB: an Embeddable Analytical Database. SIGMOD 2019.
  • Apache Parquet format specification: https://parquet.apache.org/docs/file-format/
  • Apache Arrow specification: https://arrow.apache.org/docs/format/Columnar.html
  • ClickHouse MergeTree documentation: https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree