Messaging and Failure Handling
|
Preview | Unofficial | For review only |
Apache Cassandra is a masterless distributed system where every node communicates directly with every other node — there is no primary or broker mediating inter-node traffic. The messaging layer is the internal network fabric that carries every read request, write request, gossip heartbeat, repair exchange, and consensus protocol message. Understanding this layer is essential for any contributor working on distributed features, repair, streaming, or the Accord consensus protocol, because changes in one area frequently surface as behavior changes in another. This page gives a contributor-oriented tour of the classes, packages, and design patterns that make up Cassandra’s messaging and failure handling subsystems.
|
All packages are under |
MessagingService Architecture
org.apache.cassandra.net.MessagingService is the singleton entry point for all internode communication.
It owns the outbound connection pool to each peer, registers verb handlers for inbound messages, and routes dispatched messages to the correct stage-specific thread pool.
Verb Enum
Verb is an enum that defines every message type the cluster exchanges.
Each Verb entry carries metadata: its expected Stage, serializer, expiration timeout, and handler class.
Examples of verbs:
READ_REQ -- Read request from coordinator to replica READ_RSP -- Read response from replica to coordinator MUTATION -- Write mutation to replica MUTATION_RSP -- Mutation acknowledgment COUNTER_MUTATION -- Counter write (requires special leader logic) GOSSIP_DIGEST_SYN -- Gossip initiation message GOSSIP_DIGEST_ACK -- Gossip acknowledgment GOSSIP_DIGEST_ACK2 -- Gossip acknowledgment confirmation REPAIR_REQ -- Repair session initiation STREAM_INIT -- Streaming session setup PAXOS_PROPOSE -- Paxos/LWT proposal ACCORD_* -- Accord consensus protocol messages
When adding a new message type, a new Verb entry is required.
This is an intentional registry: it makes the full surface area of internode messages auditable in one place.
Message<T> and IVerbHandler<T>
Message<T> is the typed envelope for all messages.
It carries the Verb, source endpoint, creation timestamp, payload, and optional tracing and parameter headers.
IVerbHandler<T> is the interface each verb’s handler implements — it receives the deserialized Message<T> and acts on it.
Handler registration happens in MessagingService.registerHandlers().
Every Verb has exactly one IVerbHandler implementation.
When implementing a new message type, you provide both the serializer for the payload and an IVerbHandler that processes it on the receiving end.
Thread Pool Model
Inbound messages are dispatched to Stage-specific thread pools defined in the Stage enum (org.apache.cassandra.concurrent).
Key stages:
Stage.READ -- Handles read requests Stage.MUTATION -- Handles write requests Stage.GOSSIP -- Handles gossip messages Stage.ANTI_ENTROPY -- Handles repair messages Stage.INTERNAL_RESPONSE -- Handles response callbacks Stage.ACCORD_* -- Accord consensus stages
Thread pool sizes are configurable and observable via JMX and nodetool tpstats.
When adding a new verb, assign it to the most appropriate existing stage.
Introducing a new stage requires justification because it creates a new JMX surface and a new tuning knob for operators.
Internode Protocol
Connection Establishment and Handshake
When a node first needs to send a message to a peer, OutboundConnection opens a TCP connection and performs a handshake.
The handshake negotiates:
-
The messaging version to use for the session (the minimum of both nodes' supported versions)
-
Compression (LZ4 is enabled for internode traffic when configured)
-
Whether the connection is encrypted (TLS, when
server_encryption_optionsis configured)
Protocol Versioning for Mixed-Version Clusters
Cassandra supports rolling upgrades, which means a cluster may temporarily contain nodes running different versions.
MessagingService.current_version tracks the local node’s messaging version.
When sending to a peer, the outbound connection uses the negotiated version for that peer.
Every message payload serializer must be versioned accordingly via IVersionedSerializer<T>.
Changes to serialized types must either use version-conditional logic or increment the messaging version.
Skipping this step causes silent data corruption or deserialization errors during rolling upgrades.
Frame Encoding
Each frame on the wire consists of:
-
A fixed-size header containing: verb ID, message flags, expiration timestamp, and serialized payload size
-
The serialized payload (produced by the verb’s
IVersionedSerializer)
Compression (LZ4) wraps the frame when enabled, not individual payloads.
Connection Types
OutboundConnections manages three logical channels to each peer, kept as separate TCP connections:
SMALL_MESSAGES-
For messages below a configurable size threshold; uses non-blocking I/O.
LARGE_MESSAGES-
For large payloads (streaming, repair data); uses blocking I/O to avoid head-of-line blocking on the small channel.
URGENT-
For gossip and failure detection messages; always gets through even under write backpressure.
Backpressure and Overload Protection
Each outbound connection has a bounded queue.
When the queue fills (because the peer is slow or unavailable), MessagingService drops non-urgent messages and increments the dropped_messages metric.
Contributors adding high-volume messages must consider whether the message type can be dropped safely under load.
Gossip Protocol
org.apache.cassandra.gms.Gossiper implements Cassandra’s peer-to-peer membership protocol.
It runs once per second on every node, independently of the request path.
Gossip Rounds: SYN → ACK → ACK2
Each gossip round is a three-way exchange:
-
The initiating node picks a random peer and sends a
GOSSIP_DIGEST_SYNcontaining a digest of its known state for all endpoints. -
The receiver compares the digest with its own view and responds with a
GOSSIP_DIGEST_ACKcontaining state the initiator appeared to be missing plus a request for state the receiver lacks. -
The initiator completes the round with a
GOSSIP_DIGEST_ACK2sending the requested state.
Each round, a node also probabilistically contacts an unreachable node and always contacts a seed node if no seed was contacted in the random selection.
EndpointState
EndpointState is the data structure each node advertises for itself and relays for its peers.
It contains:
-
A
HeartBeatState— a(generation, version)tuple that increments roughly every second -
A map of
ApplicationState→VersionedValueentries
ApplicationState Enum
ApplicationState keys the state a node advertises about itself:
STATUS -- Node lifecycle (NORMAL, LEAVING, LEFT, MOVING, BOOT, etc.) LOAD -- Approximate load (used for display, not routing) SCHEMA -- Schema version UUID (used to detect schema mismatches) DC -- Datacenter name RACK -- Rack name HOST_ID -- Stable UUID identifying this physical node TOKENS -- Token assignments for this node NET_VERSION -- Messaging protocol version SEVERITY -- Backpressure hint (affects speculative execution)
VersionedValue
VersionedValue wraps each application state value with a logical version number.
Gossip uses the version to discard stale updates: a value with a lower version than the currently known value is ignored.
Gossip Convergence
Gossip is eventually consistent. State changes — including node join, token changes, and schema version updates — propagate on gossip timescales: typically a few seconds in small clusters, longer in large or geographically distributed clusters. Contributors must never assume gossip state is immediately visible to all nodes after a change.
In Cassandra 6, ClusterMetadata (TCM) supersedes gossip for schema and topology state distribution.
Gossip continues to carry liveness state (STATUS, heartbeats, DC, RACK) but is no longer the authoritative source for schema version or token assignments.
Failure Detection
org.apache.cassandra.gms.FailureDetector implements the phi-accrual failure detector.
Every node independently decides whether each peer is UP or DOWN without coordination.
Phi (φ) Calculation
The failure detector maintains a sliding window of inter-arrival times for heartbeat messages from each peer. It models the distribution of those inter-arrival times as a normal distribution and computes:
φ = -log10(P(heartbeat_delay > observed_delay))
A higher φ indicates lower probability that the peer is alive.
When φ exceeds phi_convict_threshold (default: 8, configurable in cassandra.yaml), the node is convicted as DOWN.
phi_convict_threshold
phi_convict_threshold controls the sensitivity of failure detection:
-
Lower values convict faster but increase false positives (transient GC pauses, network hiccups).
-
Higher values reduce false positives but increase detection latency.
Operators on cloud environments with variable network latency often raise this value.
DOWN vs Removed
DOWN means the failure detector has convicted the node — it is not responding to heartbeats.
The node is still present in gossip state; Cassandra will attempt to restore communication if heartbeats resume.
LEFT (or "removed") means an operator explicitly decommissioned or removed the node.
Cassandra removes it from the ring, redistributes its token ranges, and stops routing to it.
Cassandra never autonomously removes a DOWN node from gossip. This prevents unnecessary data rebalancing during transient failures and avoids the data loss risk of simultaneous range movements.
Failure Detection and Availability
StorageProxy consults the failure detector via FailureDetector.instance.isAlive(endpoint) before routing requests.
A DOWN node is excluded from replica selection for reads; writes are typically directed to hints instead.
IFailureDetector is the interface for FailureDetector, allowing test code to substitute a controlled implementation.
Cluster State and Topology
TokenMetadata
TokenMetadata (org.apache.cassandra.locator) is the in-memory ring view.
It maps tokens to endpoints, endpoints to host IDs, and tracks pending operations (bootstrapping nodes, moving nodes).
It is the foundation for AbstractReplicationStrategy implementations computing which replicas own a given token.
TokenMetadata is updated as gossip delivers TOKENS and STATUS application state changes.
IEndpointSnitch
IEndpointSnitch (org.apache.cassandra.locator) provides rack and datacenter awareness.
Implementations translate a node’s IP address into a (datacenter, rack) tuple.
NetworkTopologyStrategy uses the snitch to enforce replica placement across failure domains.
Topology Changes
When a node joins, leaves, or moves:
-
The node broadcasts its new
STATUSandTOKENSvia gossip. -
Other nodes receive the gossip update and update their
TokenMetadata. -
StorageServicereacts to theIEndpointStateChangeSubscribernotification by adjusting routing and initiating streaming if needed.
TCM in Cassandra 6
In Cassandra 6, Transactional Cluster Metadata (TCM) moves authoritative cluster topology state out of gossip and into a linearizable, log-based metadata service.
ClusterMetadata (org.apache.cassandra.tcm) is the single authoritative metadata object for token assignments, keyspace configurations, and node lifecycle state.
ClusterMetadataService manages the distributed log and ensures all nodes converge to the same metadata version.
Gossip remains for liveness detection, but schema and topology sources of truth are now ClusterMetadata entries rather than gossip ApplicationState values.
See the tcm/ directory in this workspace for TCM-specific documentation drafts.
Streaming
org.apache.cassandra.streaming.StreamSession manages a bidirectional data transfer connection between two nodes.
When Streaming Occurs
Streaming is triggered by:
-
Bootstrap: A new node receiving its token ranges from existing owners
-
Decommission: A leaving node handing off its ranges to remaining owners
-
Repair: Transferring ranges identified as out-of-sync by Merkle tree comparison
-
Rebuild: Replacing local data from remote replicas (e.g., after replacing a failed node)
-
Move: A node changing its token position
Stream Plan Construction
StreamPlan describes the full set of transfers for an operation: which token ranges to send, which tables, and in which direction.
StreamResultFuture tracks the overall outcome of all sessions in a plan.
SSTable Transfer
StreamTransferTask handles sending SSTables.
There are two transfer modes:
File-level transfer-
Sends entire SSTable components; used when the entire SSTable falls within the requested range.
Range-level transfer-
Iterates over the SSTable and extracts only the partitions within the requested token range; used when an SSTable spans multiple ranges.
Streaming Integrity
During streaming, SSTables being transferred must remain consistent.
LifecycleTransaction ensures SSTables are not compacted away mid-transfer.
Streaming failure causes the session to abort and triggers cleanup of partially transferred files.
Contributors changing streaming must handle partial transfer and ensure cleanup runs on all failure paths.
For SSTable integrity during streaming, see Compaction and Tombstone Lifecycle.
Repair
The org.apache.cassandra.repair package implements anti-entropy repair, the mechanism that detects and reconciles replica inconsistencies.
Merkle Tree Construction
Repair works by building hierarchical hash trees (Merkle trees) over token ranges.
Each leaf in the tree represents a hash of the data in a sub-range.
MerkleTrees is constructed per-table per-range during a repair session.
Two replicas exchange their trees and identify ranges where the hashes differ.
Full Repair vs Incremental Repair
Full repair hashes the entire dataset for the repaired range. All data, both previously repaired and new, is included in the comparison.
Incremental repair limits comparison to SSTables that have not been marked as repaired.
After a successful incremental repair, SSTables are marked as repaired via anti-compaction, and subsequent repairs skip them.
This reduces the volume of data compared and transferred during repair.
Range-Based vs Partition-Based Comparison
By default, repair uses range-based Merkle tree comparison, which is efficient for large datasets. Sub-range repair increases tree resolution, allowing finer-grained identification of mismatched ranges (at the cost of more tree construction work).
Repair Coordination
RepairRunnable is the entry point, triggered by nodetool repair.
It constructs a RepairSession per token range, exchanges MerkleTrees with each replica pair, and spawns SyncTask instances for ranges that differ.
Sync tasks invoke the streaming layer to transfer the out-of-sync data.
Repair and Compaction
Incremental repair interacts closely with compaction.
After repair, AntiCompactionTask splits SSTables into repaired and unrepaired halves.
Compaction strategies keep repaired and unrepaired SSTables in separate buckets to avoid mixing them.
Mixing repaired and unrepaired SSTables during compaction would invalidate the repaired status and force re-repair of the merged SSTable.
Error Handling Patterns
Timeouts
Timeout thresholds are configured in cassandra.yaml and read at runtime via DatabaseDescriptor:
DatabaseDescriptor.getReadRpcTimeout() DatabaseDescriptor.getWriteRpcTimeout() DatabaseDescriptor.getCounterWriteRpcTimeout() DatabaseDescriptor.getRangeRpcTimeout()
RequestCallback starts a timeout task when a request is dispatched.
If the callback has not received enough responses before the timeout fires, a ReadTimeoutException or WriteTimeoutException is returned to the client.
Exception Taxonomy
UnavailableException-
Thrown before the request is sent — not enough live replicas to satisfy the consistency level. The coordinator did not attempt the operation.
WriteTimeoutException-
The write was sent but fewer replicas acknowledged it within the timeout than the consistency level requires. The write may or may not have been applied on some replicas.
ReadTimeoutException-
Replicas did not respond to a read within the timeout. No data was returned.
WriteFailureException-
Replicas responded with explicit errors (not silence). Introduced to distinguish deliberate rejection from non-response.
Coordinator Retry and Failure Decisions
StorageProxy orchestrates retry logic.
It uses the failure detector to skip DOWN nodes during replica selection and falls back to hints for writes when replicas are unavailable.
There is no automatic client-visible retry at the coordinator level; the exception is returned and the client decides whether to retry.
Hints
When a write cannot be delivered to a replica (because it is DOWN), the coordinator stores a hint in the system.hints table.
HintedHandoffManager replays hints to the target node when it comes back UP.
Hints are best-effort: they are dropped after max_hint_window (default: 3 hours) or when hint storage fills.
Hint delivery is how Cassandra provides eventual consistency without requiring the replica to be present at write time.
If a replica is DOWN for longer than max_hint_window, repair is required to restore consistency.
Key Source Files Summary
Area | Key Classes
-----------------------|--------------------------------------------------
Messaging | MessagingService, Verb, Message, IVerbHandler
| OutboundConnection, InboundMessageHandlers
Gossip | Gossiper, EndpointState, ApplicationState
| VersionedValue, IEndpointStateChangeSubscriber
Failure Detection | FailureDetector, IFailureDetector
Topology | TokenMetadata, ClusterMetadata
| IEndpointSnitch, StorageService
Streaming | StreamSession, StreamPlan, StreamResultFuture
| StreamTransferTask
Repair | RepairRunnable, RepairSession, MerkleTrees
| SyncTask, AntiCompactionTask
Error Handling | RequestCallback, WriteResponseHandler
| ReadCallback, HintedHandoffManager
Contributor Impact Notes
Changes in this subsystem have cluster-wide consequences. Before submitting a patch, verify the following:
- Any new message type needs a
Verbentry and handler -
Add the
Verb, register theIVerbHandlerinMessagingService.registerHandlers(), provide a versioned serializer, and assign an appropriateStage. - Protocol changes affect mixed-version compatibility
-
Any change to a serialized message type must use version-conditional logic or increment the messaging version. Test with a simulated mixed-version cluster (older peer negotiation).
- Gossip state changes propagate on gossip timescales
-
New
ApplicationStateentries or changes to existing values will not be visible to all nodes immediately. Features that depend on a state change being cluster-wide must account for convergence delay (seconds to tens of seconds). - Failure detection tuning affects availability SLAs
-
Changes to
FailureDetectororphi_convict_thresholddefaults affect how quickly the cluster responds to node loss. False positive convictions cause unnecessary hint storms and read degradation. - Streaming changes must handle partial transfer and cleanup
-
Every failure path in
StreamSessionandStreamTransferTaskmust ensure partially transferred files are removed and the transfer can be retried cleanly. Leaving partial SSTables on disk causes compaction anomalies and corrupted repair state. - Repair and compaction interact via repaired/unrepaired SSTable state
-
Changes to how SSTables are marked as repaired, or to anti-compaction, affect incremental repair correctness. Verify that repaired SSTables are not merged with unrepaired SSTables in compaction output.
See Also
-
Cassandra Internals Map — Subsystem overview and cross-references
-
Dynamo Architecture — Consistent hashing, replication, and gossip fundamentals
-
Accord — Consensus protocol that uses the messaging layer for all protocol messages
-
Accord Architecture — Accord protocol internals