Skip to main content

Stateful Stream Processing in Spark 4.0 with transformWithStateInPandas

Pascal Spörri
Author
Pascal Spörri
How Spark’s stateful processing APIs let you maintain per-entity state across micro-batches - and why the new transformWithStateInPandas (Spark 4.0 / Databricks 16.2+) is the way forward.

When processing streaming data, you often need to reconcile partial updates into a coherent view of an entity. Take flight data: a single flight might receive hundreds of updates throughout the day: gate changes, new estimated arrival times, baggage carousel assignments - each containing only the fields that changed.

A standard approach uses Delta Lake MERGE (upsert) operations: read the current state from a table, merge in the new updates, write back. This works, but each merge cycle scans the target partition before writing. With stateful processing, Spark keeps the per-entity state in memory (backed by RocksDB) and emits the merged result directly - no extra read step. Since state lives in the processing layer rather than a table, this also opens the door to consuming from Kafka directly.

Spark offers two Python APIs for stateful processing: the newer transformWithStateInPandas (Spark 4.0 / Databricks 16.2+) and the legacy applyInPandasWithState (Spark 3.4+). The key difference is that the new API’s class-based design lets Spark cache the processor and its state across micro-batches. With the legacy API, you manage state by hand on every invocation. Both require the RocksDB state store provider:

spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)

transformWithStateInPandas (Spark 4.0)
#

The transformWithStateInPandas API was introduced in Spark 4.0 and backported to Databricks Runtime 16.2 (Spark 3.5.3).

df_stream.groupBy("flightId").transformWithStateInPandas(
    statefulProcessor=MyProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="None",           # "None", "ProcessingTime", or "EventTime"
    initialState=None,         # Optional: GroupedData for state bootstrapping
)

The outputMode controls how downstream sinks interpret the processor’s output:

  • Append: each yielded row is treated as a new, immutable record. Downstream sinks only see inserts, never updates to previously emitted rows. Use this when your processor emits events or derived facts (e.g., “flight X was delayed by 30 minutes”).
  • Update: each yielded row represents the current state for a key. Downstream sinks can receive multiple rows for the same key across batches, with each one replacing the previous. Use this when you’re maintaining a live view of an entity (e.g., the latest reconciled state of a flight). This pairs naturally with Delta Lake tables where you want the sink to overwrite existing rows.

For the flight data use case, Update mode is the right choice: we emit the full reconciled flight state on every change, and the downstream Delta table overwrites the previous version.

StatefulProcessor Interface
#

You implement a class with two required methods: init to set up state variables, and handleInputRows to process incoming rows per key:

class StatefulProcessor(ABC):
    @abstractmethod
    def init(self, handle: StatefulProcessorHandle) -> None:
        """Initialize state variables. Called once per partition."""
        ...

    @abstractmethod
    def handleInputRows(
        self, key: Any, rows: Iterator[pd.DataFrame], timerValues: TimerValues
    ) -> Iterator[pd.DataFrame]:
        """Process new rows for a key. Return 0 or more output DataFrames."""
        ...

Optional hooks include handleExpiredTimer for timer-based state eviction, handleInitialState for bootstrapping state from an existing DataFrame, and close for cleanup.

StatefulProcessorHandle
#

The StatefulProcessorHandle provides multiple state variable types:

class StatefulProcessorHandle:
    def getValueState(
        self, stateName: str, schema, ttlDurationMs=None
    ) -> ValueState:
        """Single value per key."""

    def getListState(
        self, stateName: str, schema, ttlDurationMs=None
    ) -> ListState:
        """List of values per key."""

    def getMapState(
        self, stateName: str, userKeySchema, valueSchema, ttlDurationMs=None
    ) -> MapState:
        """Key-value map per key."""

    def registerTimer(self, expiryTimestampMs: int) -> None: ...
    def deleteTimer(self, expiryTimestampMs: int) -> None: ...
    def listTimers(self) -> Iterator[int]: ...
    def deleteIfExists(self, stateName: str) -> None: ...

Key capabilities:

  • Multiple named state variables: you can maintain several independent pieces of state per key.
  • TTL support: state variables can automatically expire after a configurable duration, which helps manage memory for long-running streams.
  • Timer support: register timers that trigger handleExpiredTimer callbacks, enabling time-based state eviction or delayed processing.
  • Initial state: bootstrap state from an existing DataFrame via handleInitialState, eliminating the need to replay history.

Example: Flight Data Reconciliation
#

To make this concrete, here’s how we use stateful processing to reconcile partial flight updates at Zurich Airport. Each incoming row may only contain a subset of fields that changed: a gate reassignment, a new estimated arrival time, a baggage carousel update. The processor merges these into a single coherent state per flight.

The key design choice is a naming convention: every “value” column has a companion _updated_at timestamp column. We use this update timestamp to order messages and infer state: a field is only overwritten if its incoming timestamp is newer than the stored one. For example:

ColumnTypePurpose
estimated_in_block_time_utctimestampThe value
estimated_in_block_time_utc_updated_attimestampWhen this field was last updated
gate_1stringThe value
gate_1_updated_attimestampWhen this field was last updated
flight_identifierstringMeta column (no _updated_at companion)
scheduled_flight_date_utcdateMeta column (part of the key)

This convention lets the processor automatically classify columns into three categories:

  • Value columns: have an _updated_at companion - update only if the incoming timestamp is newer.
  • Meta columns: no companion - always overwrite with the latest data.
  • Key columns: used for grouping (e.g., flight identifier + scheduled date).

We implemented a generic SCDType1StatefulProcessor base class to make the reconciliation logic reusable across projects:

class SCDType1StatefulProcessor(StatefulProcessor):
    def output_schema(self) -> StructType:
        ...  # Override in subclass

    def version_column(self) -> str:
        ...  # Override in subclass

    def updated_column(self) -> str:
        ...  # Override in subclass

    def init(self, handle: StatefulProcessorHandle) -> None:
        self.handle = handle
        self.schema = self.output_schema()
        self.version = self.version_column()
        self.updated = self.updated_column()

        sn = self.schema.names
        self.columns_val = set(
            x for x in sn if f"{x}_updated_at" in sn
        )
        updated_at_columns = set(
            x for x in sn if x.endswith("_updated_at")
        )
        self.columns_meta = (
            set(sn) - self.columns_val
            - updated_at_columns - {self.version}
        )
        self.state = handle.getValueState("state", self.output_schema())

    def _create_empty_state(self) -> dict:
        state = {name: None for name in self.schema.names}
        state[self.version] = 0
        return state

    def _get_state(self, key):
        if not self.state.exists:
            return self._create_empty_state()
        data = self.state.get()
        if data is None:
            return self._create_empty_state()
        return {
            name: value
            for name, value in zip(self.schema.names, data)
        }

    def handleInputRows(self, key, input_dfs, timer_values):
        current_state = self._get_state(key)
        updated_state = False

        for pdf in input_dfs:
            pdf.replace({numpy.nan: None}, inplace=True)
            for _, pd_row in pdf.iterrows():
                updated_row = False

                for name in self.columns_val:
                    ts = pd_row[name + "_updated_at"]
                    if ts is None:
                        continue

                    last_updated_at = current_state[name + "_updated_at"]
                    if last_updated_at is None or last_updated_at < ts:
                        current_state[name] = pd_row[name]
                        current_state[name + "_updated_at"] = ts
                        updated_row = True

                if updated_row:
                    updated_state = True
                    for name in self.columns_meta:
                        val = pd_row[name]
                        if val is not None:
                            current_state[name] = val
                    current_state[self.updated] = True
                    current_state[self.version] += 1

            if updated_state:
                self.state.update(current_state.values())
            yield pd.DataFrame([current_state])

    def close(self) -> None:
        pass

The version counter tracks how many times a flight’s state has been updated. The updated flag lets downstream consumers filter for actual changes - important because the processor always yields the full state (see the empty DataFrame gotcha below).

Note that this implementation does not expire old state - we omitted it for brevity. In production, you would set a watermark on the stream and register a timer via handle.registerTimer() to expire flights whose scheduled date has passed.

Wire it into the pipeline by subclassing:

class FlightStatefulProcessor(SCDType1StatefulProcessor):
    def output_schema(self):
        return processing_schema

    def version_column(self):
        return "_meta_update_version"

    def updated_column(self):
        return "_meta_updated_flag"

update_stream = (
    df_stream.groupBy(key_columns)
    .transformWithStateInPandas(
        statefulProcessor=FlightStatefulProcessor(),
        outputStructType=processing_schema,
        outputMode="Update",
        timeMode="None",
    )
)

Gotchas
#

  • Returning an empty DataFrame can cause errors on Databricks 16.2. Yielding an empty pd.DataFrame() may throw pyarrow.lib.ArrowInvalid: Tried to write record batch with different schema. This is likely a bug — Spark 4.0 itself uses empty DataFrames as a feature. As a workaround on Databricks, always yield the full state and use a flag column to filter downstream.
  • The class must be visible in UDF scope. Due to how Spark serializes the processor, you may need to define it in the same notebook cell or copy it inline rather than importing from a module.
  • numpy.NaT handling: None timestamps arrive as numpy.NaT due to the numpy interop. Use pdf.replace({numpy.nan: None}) before processing.

The Legacy API: applyInPandasWithState (Spark 3.4+)
#

If you’re on Spark 3.x without access to Databricks 16.2+, applyInPandasWithState is your only option. It uses a callback function instead of a class. For our flight data use case, we reimplemented the StatefulProcessor API on top of it, but since the legacy API doesn’t cache anything, the processor had to be initialized on every callback invocation.

def callback(
    key: Any,
    pdfs: Iterable[pd.DataFrame],
    state: GroupState
) -> Iterable[pd.DataFrame]:
    ...

df_stream.groupBy("id").applyInPandasWithState(
    callback,
    outputStructType,
    stateStructType,
    "append",           # or "update"
    "NoTimeout",        # or "ProcessingTimeTimeout", "EventTimeTimeout"
)

The GroupState object stores state as a single tuple: you read it with state.get, write it with state.update(tuple), and remove it with state.remove().

Limitations
#

  • State is a single tuple - no support for multiple state variables, lists, or maps.
  • No initial state - there is no built-in mechanism to bootstrap state from an existing table.
  • UDF scoping - the callback and its dependencies must be defined in the same scope.
  • None timestamps are cast to numpy.NaT - you need to explicitly handle this with pdf.replace({numpy.nan: None}, inplace=True).
  • Timeout handling is poorly documented - but works with EventTimeTimeout when combined with watermarks. Inside the callback, check state.hasTimedOut and call state.remove() to clean up stale entries.

API Comparison
#

FeatureapplyInPandasWithStatetransformWithStateInPandas
AvailabilitySpark 3.4+Spark 4.0+ / Databricks 16.2+
State modelSingle tuple per keyMultiple named variables (value, list, map)
TTLManual via timeoutsBuilt-in per state variable
TimersBasic timeout (processing/event time)Full timer registration/expiration
Initial stateNot supportedinitialState parameter + handleInitialState
InterfaceCallback functionClass-based StatefulProcessor

If you’re currently using Delta MERGE to reconcile streaming updates, transformWithStateInPandas is worth considering as a replacement. It eliminates the read-before-write cycle, keeps state in the processing layer, and gives you a clean path to Kafka-based ingestion without depending on a table for state.