Skip to content

Python Bindings

The cqlite Python package provides direct access to Cassandra 5.0 SSTable files from Python, without requiring a running Cassandra cluster. It exposes a simple open/execute/stream API built on native Python types.

Terminal window
pip install cqlite-py

Requires Python 3.9+. Pre-built wheels are available for Linux (x86_64, ARM64), macOS (Intel and Apple Silicon), and Windows (x64).

Terminal window
# Install Rust 1.85+ and maturin first
pip install maturin
git clone https://github.com/pmcfadin/cqlite
cd cqlite/bindings/python
maturin develop
import cqlite
# Open an SSTable directory with a CQL schema
with cqlite.open("path/to/sstables", schema="path/to/schema.cql") as db:
result = db.execute("SELECT * FROM test_basic.simple_table LIMIT 5")
print(f"Rows returned: {len(result)}")
print(f"Execution time: {result.execution_time_ms}ms")
for row in result:
print(row.to_dict())

Running against the test datasets produces output like:

Rows returned: 5
Execution time: 74ms
{'id': UUID('0023ece7-7c4e-4705-9068-d1a59ec5fe19'), 'name': 'Debbie Soto',
'age': 79, 'account_balance': Decimal('69799.73'), 'active': True,
'created': datetime.datetime(2025, 10, 6, 1, 12, 5, 926000, tzinfo=datetime.timezone.utc),
...}

cqlite.open() is the main entry point. It returns a Database that supports the context manager protocol.

import cqlite
# Context manager — close() called automatically on exit
with cqlite.open("data/sstables", schema="schema.cql") as db:
...
# Manual lifecycle
db = cqlite.open("data/sstables", schema="schema.cql")
try:
...
finally:
db.close() # idempotent; safe to call more than once

Parameters

ParameterTypeDescription
pathstr or PathDirectory containing SSTable files
schemastr or Path (optional)Path to a .cql schema file
configdict or str (optional)Configuration dict or preset name
writablebool (default False)Enable write support (INSERT/UPDATE/DELETE)
write_dirstr or PathDirectory for WAL and flushed SSTables; required when writable=True

db.execute() runs a CQL SELECT (or DML when writable=True) and returns a QueryResult containing all rows.

# Iterate directly over the result
for row in db.execute("SELECT id, name, age FROM test_basic.simple_table LIMIT 10"):
print(f"{row['name']}: age {row['age']}")
# Access metadata
result = db.execute("SELECT * FROM test_basic.simple_table LIMIT 100")
print(f"Rows: {len(result)}")
print(f"Time: {result.execution_time_ms}ms")
print(f"Columns: {[col.name for col in result.columns]}")

CQL types are mapped to native Python types automatically:

CQL TypePython Type
text, varchar, asciistr
int, smallint, tinyintint
bigint, varintint (arbitrary precision)
floatfloat
doublefloat
booleanbool
blobbytes
timestampdatetime.datetime (UTC-aware)
datedatetime.date
timedatetime.time
durationdatetime.timedelta
uuid, timeuuiduuid.UUID
inetipaddress.IPv4Address or IPv6Address
decimaldecimal.Decimal
counterint
list<T>list
set<T>frozenset
map<K,V>dict
tuple<...>tuple
frozen<T>Unwrapped inner type
UDTdict with _type and _keyspace keys
nullNone

For tables with many rows, use db.execute_streaming() instead of db.execute(). It yields rows one at a time, keeping memory usage bounded regardless of result size.

import cqlite
from cqlite import StreamingConfig
# Default streaming (buffers up to 1024 rows, ~11 MB peak)
with cqlite.open("data/sstables", schema="schema.cql") as db:
for row in db.execute_streaming("SELECT * FROM test_basic.simple_table"):
process(row)
# Custom config for memory-constrained environments
config = StreamingConfig(buffer_size=256, chunk_size=2000)
with cqlite.open("data/sstables", schema="schema.cql") as db:
iterator = db.execute_streaming(
"SELECT * FROM test_basic.simple_table", config=config
)
for row in iterator:
if iterator.rows_received % 1000 == 0:
print(f"Processed {iterator.rows_received} rows")

StreamingConfig takes two parameters controlling memory usage:

ParameterDefaultDescription
buffer_size1024Rows to hold in-flight (~1 MB with typical rows)
chunk_size10000Rows fetched per storage read (~10 MB per chunk)

For rows with large blobs, reduce both values proportionally.

You can also open a database with a built-in memory preset:

# 256 MB memory limit
db = cqlite.open("data/sstables", schema="schema.cql", config="memory_optimized")
# 4 GB memory limit
db = cqlite.open("data/sstables", schema="schema.cql", config="performance_optimized")

db.export_parquet() writes query results straight to a Parquet file using the embeddable core writer. The query streams, so arbitrarily large result sets export within bounded memory, and the GIL is released for the duration of the export.

with cqlite.open("data/sstables", schema="schema.cql") as db:
rows = db.export_parquet(
"SELECT * FROM test_basic.simple_table",
"/tmp/simple_table.parquet",
row_group_size=10000, # rows per Parquet row group (default)
compression="snappy", # "snappy" (default), "zstd", or "none"
)
print(f"Exported {rows} rows")

The output uses the high-fidelity Arrow type mapping — typed lists, maps, structs for UDTs/tuples, Decimal128, Date32, Time64, UUID extension — see Output Formats for the full table.

Invalid options raise ValueError; file and encoding failures raise IOError; query failures raise the usual QueryError/ParseError.

import cqlite
try:
with cqlite.open("data/sstables", schema="schema.cql") as db:
for row in db.execute("SELECT * FROM test_basic.simple_table"):
print(row.to_dict())
except cqlite.ParseError as e:
print(f"CQL syntax error: {e}")
except cqlite.QueryError as e:
print(f"Query execution failed: {e}")
except cqlite.SchemaError as e:
print(f"Schema validation failed: {e}")
except IOError as e:
print(f"File not found: {e}")
except RuntimeError as e:
print(f"Database already closed: {e}")

Exception hierarchy

CqliteError # base exception for all CQLite errors
├── SchemaError # schema parsing or validation failures
├── QueryError # query execution failures
└── ParseError # CQL syntax errors
Built-in exceptions also used:
├── IOError # file system errors
├── ValueError # invalid configuration
├── RuntimeError # invalid state (e.g. database closed)
└── MemoryError # memory allocation failures

Open the database with writable=True and a write_dir to enable INSERT, UPDATE, and DELETE statements.

import cqlite
with cqlite.open(
"path/to/sstables",
schema="schema.cql",
writable=True,
write_dir="/tmp/my-writes",
) as db:
db.execute(
"INSERT INTO test_basic.simple_table (id, name, age) "
"VALUES (11111111-1111-1111-1111-111111111111, 'Alice', 30)"
)
# Flush in-memory writes to an SSTable on disk
path = db.flush_run()
print(f"Flushed to: {path}")
# Run incremental compaction within a time budget
report = db.maintenance_step(budget_ms=100)
print(f"Merged {report.rows_merged} rows in {report.time_spent_ms:.1f} ms")
if report.pending_compaction:
print("More compaction work available")
# Inspect write engine state
stats = db.write_stats
print(f"Memtable: {stats.memtable_size} bytes, {stats.memtable_rows} rows")

Known write limitations

  • Counter columns cannot be written; execute() raises CqliteError for counter mutations.
  • Concurrent queries on the same handle require a warm-up query first (see issue #311).

Database handles are thread-safe via Arc<Database>. All blocking operations (open, execute, execute_streaming, close) release the Python GIL so other threads can run during I/O.

Each thread should create its own StreamingIterator via execute_streaming(). Do not share a single iterator across threads.

Method / propertyDescription
cqlite.open(path, ...)Open a database; returns Database
db.execute(query)Run a CQL query; returns QueryResult
db.execute_streaming(query, config?)Memory-bounded row iteration; returns StreamingIterator
db.export_parquet(query, path, *, row_group_size?, compression?)Stream query results to a Parquet file; returns row count
db.prepare(query)Parse and plan a query; returns PreparedStatement
db.stats()Storage and memory metrics; returns DatabaseStats
db.flush_run()Flush memtable to SSTable; returns Data.db path
db.maintenance_step(budget_ms)Incremental compaction; returns MaintenanceReport
db.write_statsWrite engine snapshot; returns WriteStats
db.close()Release resources (idempotent)
db.is_closedTrue if the connection is closed

Full type stubs are in bindings/python/python/cqlite/__init__.pyi.