# OHLCV streaming download

Most pages in this section show a SQL query that *replaces* a chunk of pandas. This one is the opposite: a small Python wrapper around a trivially simple SQL query, designed to **download large volumes of data** straight to a Parquet file.

The pandas idiom this replaces is `client.query_df(sql)` followed by `df.to_parquet(path)` — fine for thousands of rows, increasingly painful past a few hundred thousand, an outright OOM at multi-million scale.

## Query

The SQL itself is a one-liner — a multi-exchange BTC-USDT 1-minute OHLCV pull, no aggregation, no analytics:

```sql
SELECT start, exchange, market,
       toFloat64(open)   AS open,
       toFloat64(high)   AS high,
       toFloat64(low)    AS low,
       toFloat64(close)  AS close,
       toFloat64(volume) AS volume
FROM api.ohlcv(candle_duration_in_minutes = 1)
WHERE market = 'BTC-USDT'
  AND exchange IN ('binance', 'okx', 'kucoin', 'gateio')
  AND start >= toDateTime('2024-01-01')
  AND start <  toDateTime('2024-12-31') + INTERVAL 1 DAY
ORDER BY exchange, start
```

For the year of 2024 across the four exchanges, that's about 525,600 minutes × 4 exchanges ≈ **2.1 million rows**. `client.query_df(sql)` would peak around 500 MB – 1 GB of resident memory before writing anything.

## Python streaming wrapper

`clickhouse-connect` exposes `Client.raw_stream(query, fmt='...')` — the server formats the result row-by-row in a streaming pipeline and ships it via HTTP chunked transfer. Wrap it with a byte-loop into a file and you get bounded RAM regardless of result size:

```python
from pathlib import Path

from clickhouse_connect.driver import Client

def download_to_parquet(client: Client, query: str, out_path: Path) -> int:
    """Stream `query` directly to `out_path` as Parquet, return bytes written."""
    out_path.parent.mkdir(parents=True, exist_ok=True)
    bytes_written = 0
    with (
        client.raw_stream(query, fmt="Parquet") as stream,
        out_path.open("wb") as f,
    ):
        for chunk in stream:
            f.write(chunk)
            bytes_written += len(chunk)
    return bytes_written

client = clickhouse_connect.get_client(
    host='<provided_database_url>', port=8443, secure=True,
    username='<username>', password='<password>',
    database='public_data')

sql = """
SELECT *
FROM api.ohlcv(candle_duration_in_minutes = 1)
WHERE market = 'BTC-USDT'
  AND exchange IN ('binance', 'okx', 'kucoin', 'gateio')
  AND start >= toDateTime('2024-01-01')
  AND start <  toDateTime('2024-12-31') + INTERVAL 1 DAY
ORDER BY exchange, start
"""

download_to_parquet(client, sql, Path('btc_ohlcv_2024.parquet'))
```

Three things to notice:

* **`fmt='Parquet'`** picks the [ClickHouse format](https://clickhouse.com/docs/en/interfaces/formats#data-format-parquet) — typed columns, columnar layout, very compact on disk. Other useful alternatives: `CSVWithNames`, `JSONEachRow`, `Arrow`, `TSVWithNames`.
* **No pandas, no NumPy, no per-row Python.** The Parquet bytes leave ClickHouse and land in the file with one Python-level allocation per HTTP chunk (\~64 KB by default).
* **Order of magnitude on the koinju cluster.** The December-2024 slice of the query above (4 exchanges × 31 days × 1-minute candles ≈ 178 K rows) downloads as **5.4 MB Parquet in \~5.3 seconds**. Extrapolated linearly the full 2024 pull is \~65 MB / \~65 seconds — bounded by ClickHouse's Parquet writer and your network throughput, not by client RAM. (For comparison, the same data as `CSVWithNames` is \~17 MB / \~7 seconds — Parquet is \~3× smaller and \~30 % faster end-to-end.)

## Reading the Parquet back

```python
import pyarrow.parquet as pq

table = pq.read_table('btc_ohlcv_2024.parquet')
# table.num_rows, table.column_names, table.to_pandas(), …
```

A note on schema: ClickHouse's Parquet writer maps `DateTime` columns to Parquet `uint32` (Unix-epoch seconds) and `LowCardinality(String)` to Parquet `binary`. Both are conventional and most readers (pyarrow, DuckDB, Spark) decode them transparently when converted to typed results — but if you're parsing the Parquet at the byte level, expect those physical types rather than `timestamp[us]` / `utf8`.

## Companion code

This page's repo companion at [`api/python_vs_sql/ohlcv_streaming/`](https://gitlab.com/koinju/connector/exporter/-/tree/master/api/python_vs_sql/ohlcv_streaming) packages the wrapper as a runnable `download.py`, plus a smoke test that exercises the pipeline on a tiny window (1 hour × 1 exchange) — verifying that the script actually writes a non-empty Parquet file with the right schema and row count, without burning quota on a multi-million-row pull every CI run.

## Extending it

* **Different query** — the wrapper is data-agnostic. Pass any SQL string (a multi-asset trade pull, a candle fan-out, a custom aggregate over a long window) and it streams the result. The only cost of a complex query is server-side compute time, not client memory.
* **Different format** — change `fmt='Parquet'` to `'CSVWithNames'`, `'JSONEachRow'`, `'Arrow'`, etc. CSV is friendlier to text-only tooling; Arrow IPC is the fastest in-process round-trip to pyarrow.
* **Resumable downloads** — the simple version doesn't checkpoint. For genuinely huge pulls, partition the SQL by month or by exchange and run one wrapper call per partition; failures retry one partition, not the whole pull.
* **Direct upload to S3 / object storage** — replace `out_path.open('wb')` with a streaming upload buffer (`boto3.upload_fileobj` and friends accept iterators). The bytes never touch local disk.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.koinju.io/compute-engine/ohlcv-streaming.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
