transformWithStateInPandas (Spark 4.0 / Databricks 16.2+) is the way forward.When processing streaming data, you often need to reconcile partial updates into a coherent view of an entity. Take flight data: a single flight might receive hundreds of updates throughout the day: gate changes, new estimated arrival times, baggage carousel assignments - each containing only the fields that changed.
A standard approach uses Delta Lake MERGE (upsert) operations: read the current state from a table, merge in the new updates, write back. This works, but each merge cycle scans the target partition before writing. With stateful processing, Spark keeps the per-entity state in memory (backed by RocksDB) and emits the merged result directly - no extra read step. Since state lives in the processing layer rather than a table, this also opens the door to consuming from Kafka directly.
Spark offers two Python APIs for stateful processing: the newer transformWithStateInPandas (Spark 4.0 / Databricks 16.2+) and the legacy applyInPandasWithState (Spark 3.4+). The key difference is that the new API’s class-based design lets Spark cache the processor and its state across micro-batches. With the legacy API, you manage state by hand on every invocation. Both require the RocksDB state store provider:
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)transformWithStateInPandas (Spark 4.0)#
The transformWithStateInPandas API was introduced in Spark 4.0 and backported to Databricks Runtime 16.2 (Spark 3.5.3).
df_stream.groupBy("flightId").transformWithStateInPandas(
statefulProcessor=MyProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None", # "None", "ProcessingTime", or "EventTime"
initialState=None, # Optional: GroupedData for state bootstrapping
)The outputMode controls how downstream sinks interpret the processor’s output:
- Append: each yielded row is treated as a new, immutable record. Downstream sinks only see inserts, never updates to previously emitted rows. Use this when your processor emits events or derived facts (e.g., “flight X was delayed by 30 minutes”).
- Update: each yielded row represents the current state for a key. Downstream sinks can receive multiple rows for the same key across batches, with each one replacing the previous. Use this when you’re maintaining a live view of an entity (e.g., the latest reconciled state of a flight). This pairs naturally with Delta Lake tables where you want the sink to overwrite existing rows.
For the flight data use case, Update mode is the right choice: we emit the full reconciled flight state on every change, and the downstream Delta table overwrites the previous version.
StatefulProcessor Interface#
You implement a class with two required methods: init to set up state variables, and handleInputRows to process incoming rows per key:
class StatefulProcessor(ABC):
@abstractmethod
def init(self, handle: StatefulProcessorHandle) -> None:
"""Initialize state variables. Called once per partition."""
...
@abstractmethod
def handleInputRows(
self, key: Any, rows: Iterator[pd.DataFrame], timerValues: TimerValues
) -> Iterator[pd.DataFrame]:
"""Process new rows for a key. Return 0 or more output DataFrames."""
...Optional hooks include handleExpiredTimer for timer-based state eviction, handleInitialState for bootstrapping state from an existing DataFrame, and close for cleanup.
StatefulProcessorHandle#
The StatefulProcessorHandle provides multiple state variable types:
class StatefulProcessorHandle:
def getValueState(
self, stateName: str, schema, ttlDurationMs=None
) -> ValueState:
"""Single value per key."""
def getListState(
self, stateName: str, schema, ttlDurationMs=None
) -> ListState:
"""List of values per key."""
def getMapState(
self, stateName: str, userKeySchema, valueSchema, ttlDurationMs=None
) -> MapState:
"""Key-value map per key."""
def registerTimer(self, expiryTimestampMs: int) -> None: ...
def deleteTimer(self, expiryTimestampMs: int) -> None: ...
def listTimers(self) -> Iterator[int]: ...
def deleteIfExists(self, stateName: str) -> None: ...Key capabilities:
- Multiple named state variables: you can maintain several independent pieces of state per key.
- TTL support: state variables can automatically expire after a configurable duration, which helps manage memory for long-running streams.
- Timer support: register timers that trigger
handleExpiredTimercallbacks, enabling time-based state eviction or delayed processing. - Initial state: bootstrap state from an existing DataFrame via
handleInitialState, eliminating the need to replay history.
Example: Flight Data Reconciliation#
To make this concrete, here’s how we use stateful processing to reconcile partial flight updates at Zurich Airport. Each incoming row may only contain a subset of fields that changed: a gate reassignment, a new estimated arrival time, a baggage carousel update. The processor merges these into a single coherent state per flight.
The key design choice is a naming convention: every “value” column has a companion _updated_at timestamp column. We use this update timestamp to order messages and infer state: a field is only overwritten if its incoming timestamp is newer than the stored one. For example:
| Column | Type | Purpose |
|---|---|---|
estimated_in_block_time_utc | timestamp | The value |
estimated_in_block_time_utc_updated_at | timestamp | When this field was last updated |
gate_1 | string | The value |
gate_1_updated_at | timestamp | When this field was last updated |
flight_identifier | string | Meta column (no _updated_at companion) |
scheduled_flight_date_utc | date | Meta column (part of the key) |
This convention lets the processor automatically classify columns into three categories:
- Value columns: have an
_updated_atcompanion - update only if the incoming timestamp is newer. - Meta columns: no companion - always overwrite with the latest data.
- Key columns: used for grouping (e.g., flight identifier + scheduled date).
We implemented a generic SCDType1StatefulProcessor base class to make the reconciliation logic reusable across projects:
class SCDType1StatefulProcessor(StatefulProcessor):
def output_schema(self) -> StructType:
... # Override in subclass
def version_column(self) -> str:
... # Override in subclass
def updated_column(self) -> str:
... # Override in subclass
def init(self, handle: StatefulProcessorHandle) -> None:
self.handle = handle
self.schema = self.output_schema()
self.version = self.version_column()
self.updated = self.updated_column()
sn = self.schema.names
self.columns_val = set(
x for x in sn if f"{x}_updated_at" in sn
)
updated_at_columns = set(
x for x in sn if x.endswith("_updated_at")
)
self.columns_meta = (
set(sn) - self.columns_val
- updated_at_columns - {self.version}
)
self.state = handle.getValueState("state", self.output_schema())
def _create_empty_state(self) -> dict:
state = {name: None for name in self.schema.names}
state[self.version] = 0
return state
def _get_state(self, key):
if not self.state.exists:
return self._create_empty_state()
data = self.state.get()
if data is None:
return self._create_empty_state()
return {
name: value
for name, value in zip(self.schema.names, data)
}
def handleInputRows(self, key, input_dfs, timer_values):
current_state = self._get_state(key)
updated_state = False
for pdf in input_dfs:
pdf.replace({numpy.nan: None}, inplace=True)
for _, pd_row in pdf.iterrows():
updated_row = False
for name in self.columns_val:
ts = pd_row[name + "_updated_at"]
if ts is None:
continue
last_updated_at = current_state[name + "_updated_at"]
if last_updated_at is None or last_updated_at < ts:
current_state[name] = pd_row[name]
current_state[name + "_updated_at"] = ts
updated_row = True
if updated_row:
updated_state = True
for name in self.columns_meta:
val = pd_row[name]
if val is not None:
current_state[name] = val
current_state[self.updated] = True
current_state[self.version] += 1
if updated_state:
self.state.update(current_state.values())
yield pd.DataFrame([current_state])
def close(self) -> None:
passThe version counter tracks how many times a flight’s state has been updated. The updated flag lets downstream consumers filter for actual changes - important because the processor always yields the full state (see the empty DataFrame gotcha below).
Note that this implementation does not expire old state - we omitted it for brevity. In production, you would set a watermark on the stream and register a timer via handle.registerTimer() to expire flights whose scheduled date has passed.
Wire it into the pipeline by subclassing:
class FlightStatefulProcessor(SCDType1StatefulProcessor):
def output_schema(self):
return processing_schema
def version_column(self):
return "_meta_update_version"
def updated_column(self):
return "_meta_updated_flag"
update_stream = (
df_stream.groupBy(key_columns)
.transformWithStateInPandas(
statefulProcessor=FlightStatefulProcessor(),
outputStructType=processing_schema,
outputMode="Update",
timeMode="None",
)
)Gotchas#
- Returning an empty DataFrame can cause errors on Databricks 16.2. Yielding an empty
pd.DataFrame()may throwpyarrow.lib.ArrowInvalid: Tried to write record batch with different schema. This is likely a bug — Spark 4.0 itself uses empty DataFrames as a feature. As a workaround on Databricks, always yield the full state and use a flag column to filter downstream. - The class must be visible in UDF scope. Due to how Spark serializes the processor, you may need to define it in the same notebook cell or copy it inline rather than importing from a module.
numpy.NaThandling:Nonetimestamps arrive asnumpy.NaTdue to the numpy interop. Usepdf.replace({numpy.nan: None})before processing.
The Legacy API: applyInPandasWithState (Spark 3.4+)#
If you’re on Spark 3.x without access to Databricks 16.2+, applyInPandasWithState is your only option. It uses a callback function instead of a class. For our flight data use case, we reimplemented the StatefulProcessor API on top of it, but since the legacy API doesn’t cache anything, the processor had to be initialized on every callback invocation.
def callback(
key: Any,
pdfs: Iterable[pd.DataFrame],
state: GroupState
) -> Iterable[pd.DataFrame]:
...
df_stream.groupBy("id").applyInPandasWithState(
callback,
outputStructType,
stateStructType,
"append", # or "update"
"NoTimeout", # or "ProcessingTimeTimeout", "EventTimeTimeout"
)The GroupState object stores state as a single tuple: you read it with state.get, write it with state.update(tuple), and remove it with state.remove().
Limitations#
- State is a single tuple - no support for multiple state variables, lists, or maps.
- No initial state - there is no built-in mechanism to bootstrap state from an existing table.
- UDF scoping - the callback and its dependencies must be defined in the same scope.
Nonetimestamps are cast tonumpy.NaT- you need to explicitly handle this withpdf.replace({numpy.nan: None}, inplace=True).- Timeout handling is poorly documented - but works with
EventTimeTimeoutwhen combined with watermarks. Inside the callback, checkstate.hasTimedOutand callstate.remove()to clean up stale entries.
API Comparison#
| Feature | applyInPandasWithState | transformWithStateInPandas |
|---|---|---|
| Availability | Spark 3.4+ | Spark 4.0+ / Databricks 16.2+ |
| State model | Single tuple per key | Multiple named variables (value, list, map) |
| TTL | Manual via timeouts | Built-in per state variable |
| Timers | Basic timeout (processing/event time) | Full timer registration/expiration |
| Initial state | Not supported | initialState parameter + handleInitialState |
| Interface | Callback function | Class-based StatefulProcessor |
If you’re currently using Delta MERGE to reconcile streaming updates, transformWithStateInPandas is worth considering as a replacement. It eliminates the read-before-write cycle, keeps state in the processing layer, and gives you a clean path to Kafka-based ingestion without depending on a table for state.
