Back to all posts

Integrating KurrentDB and Databricks via Arrow Flight SQL: A Reference Implementation Guide for Event-Sourced Analytics Part 2

Tony Young avatar Tony Young
Integrating KurrentDB and Databricks via Arrow Flight SQL: A Reference Implementation Guide for Event-Sourced Analytics Part 2

This is Part 2 of a two-part series.

Part 1 covered setup, connectivity, and building an analytics read model — a single Flight SQL query materialised daily into a Delta table and served to BI.

This document extends the pattern into a full medallion pipeline (Bronze → Silver → Gold) with hourly incremental ingestion, per-event-type typed tables, and orchestrated Databricks Workflows for higher throughput and sub-daily refresh.

Creating an Analytics Pipeline

For higher-throughput, sub-daily refresh, or use cases that require analytical joins with other Databricks-resident data, build a proper medallion pipeline. This section shows the simplest version of that pattern, with hourly scheduling and minimal data duplication. The same ADBC Flight SQL driver used in the read model also powers the Bronze ingestion here.

The end state has the same shape as the read model — one row per loan with current lifecycle state — but lives in its own loan_lakehouse.gold.loan_lifecycle table so the two patterns can coexist in the same catalog. The difference is how the data gets there:

  • Bronze — hourly Flight SQL pull of new events into a thin landing table, keyed by log_position
  • Silver — one table per event type, with JSON parsed into typed columns (no JSON column carried forward, to avoid duplication)
  • Gold — a daily-or-hourly join across the Silver tables, producing the same lifecycle table as the read model

Create the tables

USE CATALOG loan_lakehouse;

-- Bronze: thin raw landing
CREATE TABLE IF NOT EXISTS bronze.events (
    log_position     BIGINT       NOT NULL,
    stream           STRING       NOT NULL,
    category         STRING       NOT NULL,
    schema_name      STRING       NOT NULL,
    created_at       TIMESTAMP    NOT NULL,
    data             STRING,
    ingested_at      TIMESTAMP    NOT NULL,
    event_date       DATE         GENERATED ALWAYS AS (CAST(created_at AS DATE)),
    CONSTRAINT pk_bronze_events PRIMARY KEY (log_position) RELY
)
USING DELTA
PARTITIONED BY (event_date)
TBLPROPERTIES (delta.enableChangeDataFeed = true);

-- Ops: high-water mark
CREATE TABLE IF NOT EXISTS ops.ingest_hwm (
    pipeline_name    STRING    NOT NULL,
    last_position    BIGINT    NOT NULL,
    updated_at       TIMESTAMP NOT NULL
) USING DELTA;

INSERT INTO ops.ingest_hwm
SELECT 'kurrent_to_bronze', 0, current_timestamp()
WHERE NOT EXISTS (SELECT 1 FROM ops.ingest_hwm WHERE pipeline_name = 'kurrent_to_bronze');

-- Silver: per event type, typed columns only (no JSON duplicated)
CREATE TABLE IF NOT EXISTS silver.loan_requested (
    stream STRING NOT NULL, loan_request_id STRING, amount INT,
    national_id STRING, name STRING, gender STRING, age INT,
    marital_status STRING, dependents INT, level_of_education STRING,
    employer_name STRING, job_title STRING, job_seniority FLOAT, income INT,
    address_street STRING, address_city STRING, address_region STRING,
    address_country STRING, address_postal_code STRING,
    loan_requested_timestamp TIMESTAMP, loan_product_id INT,
    log_position BIGINT NOT NULL
) USING DELTA;

CREATE TABLE IF NOT EXISTS silver.credit_checked (
    stream STRING NOT NULL, score INT,
    credit_checked_timestamp TIMESTAMP,
    log_position BIGINT NOT NULL
) USING DELTA;

CREATE TABLE IF NOT EXISTS silver.loan_auto_decisioned (
    stream STRING NOT NULL, schema_name STRING NOT NULL,
    loan_automated_decision_timestamp TIMESTAMP,
    log_position BIGINT NOT NULL
) USING DELTA;

CREATE TABLE IF NOT EXISTS silver.loan_manual_decisioned (
    stream STRING NOT NULL, schema_name STRING NOT NULL,
    approver_name STRING, loan_manual_decision_timestamp TIMESTAMP,
    log_position BIGINT NOT NULL
) USING DELTA;

CREATE TABLE IF NOT EXISTS silver.automated_summary (
    stream STRING NOT NULL,
    credit_score_summary STRING, income_and_employment_summary STRING,
    loan_to_income_summary STRING, marital_status_and_dependents_summary STRING,
    recommended_further_investigation STRING,
    summarized_by STRING, summarized_at TIMESTAMP,
    log_position BIGINT NOT NULL
) USING DELTA;

CREATE TABLE IF NOT EXISTS gold.loan_lifecycle (
    loan_request_id                       STRING,
    amount                                INT,
    national_id                           STRING,
    name                                  STRING,
    gender                                STRING,
    age                                   INT,
    marital_status                        STRING,
    dependents                            INT,
    level_of_education                    STRING,
    employer_name                         STRING,
    job_title                             STRING,
    job_seniority                         FLOAT,
    income                                INT,
    address_street                        STRING,
    address_city                          STRING,
    address_region                        STRING,
    address_country                       STRING,
    address_postal_code                   STRING,
    loan_requested_timestamp              TIMESTAMP,
    loan_product_id                       INT,
    score                                 INT,
    credit_checked_timestamp              TIMESTAMP,
    loan_automated_decision_timestamp     TIMESTAMP,
    approver_name                         STRING,
    loan_manual_decision_timestamp        TIMESTAMP,
    credit_score_summary                  STRING,
    income_and_employment_summary         STRING,
    loan_to_income_summary                STRING,
    marital_status_and_dependents_summary STRING,
    recommended_further_investigation     STRING,
    summarized_by                         STRING,
    summarized_at                         TIMESTAMP,
    laststatus                            STRING,
    age_binned                            STRING
) USING DELTA;

Bronze ingestion notebook

bronze_ingest — pulls new loan-category events from KurrentDB into Bronze, MERGEs on log_position for idempotency.

Cell 1 — install the ADBC Flight SQL driver:

%pip install adbc-driver-flightsql
dbutils.library.restartPython()

Cell 2 — the ingestion logic:

"""
Bronze: pull new loanRequest events from KurrentDB via ADBC Flight SQL,
MERGE into bronze.events, advance HWM.
"""
import base64
import adbc_driver_flightsql.dbapi as dbapi
from adbc_driver_flightsql import DatabaseOptions
from pyspark.sql import functions as F

CLUSTER_HOST = "<cluster-id>.mesdb.eventstore.cloud"
CLUSTER_PORT = 2113
USERNAME = "admin"
PASSWORD = "changeit"
PIPELINE = "kurrent_to_bronze"
BATCH_LIMIT = 50_000

# 1. Read current HWM. If this pipeline has never run, no row exists yet —
#    start from the beginning of the log.
hwm_rows = (spark.table("loan_lakehouse.ops.ingest_hwm")
                 .filter(F.col("pipeline_name") == PIPELINE)
                 .collect())
hwm = hwm_rows[0]["last_position"] if hwm_rows else 0
print(f"[{PIPELINE}] starting from log_position={hwm}")

# 2. Pull new events. Note created_at in kdb.records is Unix UTC ms (INT64),
#    so we convert with epoch_ms() per the Kurrent docs.
query = f"""
SELECT stream, category, schema_name,
       epoch_ms(created_at) AS created_at,
       data,
       log_position
FROM kdb.records
WHERE category = 'loanRequest'
  AND log_position > {hwm}
ORDER BY log_position
LIMIT {BATCH_LIMIT}
"""

auth_str = f"{USERNAME}:{PASSWORD}"
encoded_auth = base64.b64encode(auth_str.encode()).decode()

conn = dbapi.connect(
    f"grpc+tls://{CLUSTER_HOST}:{CLUSTER_PORT}",
    db_kwargs={
        DatabaseOptions.AUTHORIZATION_HEADER.value: f"Basic {encoded_auth}",
    },
)
cur = conn.cursor()
cur.execute(query)
arrow_table = cur.fetch_arrow_table()

n = arrow_table.num_rows
print(f"[{PIPELINE}] fetched {n} events")

if n == 0:
    dbutils.notebook.exit("OK_NO_DATA")

# 3. Arrow → Spark, add ingestion timestamp, MERGE
df = (spark.createDataFrame(arrow_table.to_pandas())
           .withColumn("ingested_at", F.current_timestamp()))
df.createOrReplaceTempView("incoming")

spark.sql("""
    MERGE INTO loan_lakehouse.bronze.events tgt
    USING incoming src
    ON tgt.log_position = src.log_position
    WHEN NOT MATCHED THEN INSERT (log_position, stream, category, schema_name, created_at, data, ingested_at)
                          VALUES (src.log_position, src.stream, src.category, src.schema_name, src.created_at, src.data, src.ingested_at)
""")

# 4. Upsert HWM. MERGE handles both first-run (no row) and ongoing (row exists).
new_hwm = df.agg(F.max("log_position")).collect()[0][0]
spark.sql(f"""
    MERGE INTO loan_lakehouse.ops.ingest_hwm tgt
    USING (SELECT '{PIPELINE}' AS pipeline_name, CAST({new_hwm} AS BIGINT) AS last_position) src
    ON tgt.pipeline_name = src.pipeline_name
    WHEN MATCHED THEN UPDATE SET last_position = src.last_position,
                                 updated_at    = current_timestamp()
    WHEN NOT MATCHED THEN INSERT (pipeline_name, last_position, updated_at)
                          VALUES (src.pipeline_name, src.last_position, current_timestamp())
""")
print(f"[{PIPELINE}] HWM advanced to {new_hwm}")
dbutils.notebook.exit(f"OK_{n}")

Silver projection notebooks

One notebook per event type. Each reads new Bronze rows via Delta Change Data Feed and projects the JSON data field into typed columns. Crucially, Silver does not carry the data column forward — only the extracted typed fields — so the raw event JSON exists only once in Bronze.

NOTE: All five notebooks follow the same shape: filter Bronze by schema_name, parse data against a typed schema, project the columns Gold needs, and write to a per-event-type Silver table with availableNow (batch) trigger.

silver_loan_requested

"""
Silver: project LoanRequested events from Bronze into typed columns.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

SCHEMA = StructType([
    StructField("LoanRequestID",     StringType()),
    StructField("Amount",            IntegerType()),
    StructField("NationalID",        StringType()),
    StructField("Name",              StringType()),
    StructField("Gender",            StringType()),
    StructField("Age",               IntegerType()),
    StructField("MaritalStatus",     StringType()),
    StructField("Dependents",        IntegerType()),
    StructField("LevelOfEducation",  StringType()),
    StructField("EmployerName",      StringType()),
    StructField("JobTitle",          StringType()),
    StructField("JobSeniority",      FloatType()),
    StructField("Income",            IntegerType()),
    StructField("Address", StructType([
        StructField("Street",     StringType()),
        StructField("City",       StringType()),
        StructField("Region",     StringType()),
        StructField("Country",    StringType()),
        StructField("PostalCode", StringType()),
    ])),
    StructField("LoanRequestedTimestamp", StringType()),
    StructField("LoanProductID",     IntegerType()),
])

stream = (spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", "0")
    .table("loan_lakehouse.bronze.events")
    .filter(F.col("schema_name") == "LoanRequested"))

projected = (stream
    .withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
    .select(
        "stream",
        F.col("payload.LoanRequestID").alias("loan_request_id"),
        F.col("payload.Amount").alias("amount"),
        F.col("payload.NationalID").alias("national_id"),
        F.col("payload.Name").alias("name"),
        F.col("payload.Gender").alias("gender"),
        F.col("payload.Age").alias("age"),
        F.col("payload.MaritalStatus").alias("marital_status"),
        F.col("payload.Dependents").alias("dependents"),
        F.col("payload.LevelOfEducation").alias("level_of_education"),
        F.col("payload.EmployerName").alias("employer_name"),
        F.col("payload.JobTitle").alias("job_title"),
        F.col("payload.JobSeniority").alias("job_seniority"),
        F.col("payload.Income").alias("income"),
        F.col("payload.Address.Street").alias("address_street"),
        F.col("payload.Address.City").alias("address_city"),
        F.col("payload.Address.Region").alias("address_region"),
        F.col("payload.Address.Country").alias("address_country"),
        F.col("payload.Address.PostalCode").alias("address_postal_code"),
        F.to_timestamp("payload.LoanRequestedTimestamp").alias("loan_requested_timestamp"),
        F.col("payload.LoanProductID").alias("loan_product_id"),
        "log_position",
    ))

(projected.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_loan_requested")
    .trigger(availableNow=True)
    .toTable("loan_lakehouse.silver.loan_requested"))

silver_credit_checked

"""
Silver: project CreditChecked events from Bronze.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

SCHEMA = StructType([
    StructField("Score",                  IntegerType()),
    StructField("CreditCheckedTimestamp", StringType()),
])

stream = (spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", "0")
    .table("loan_lakehouse.bronze.events")
    .filter(F.col("schema_name") == "CreditChecked"))

projected = (stream
    .withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
    .select(
        "stream",
        F.col("payload.Score").alias("score"),
        F.to_timestamp("payload.CreditCheckedTimestamp").alias("credit_checked_timestamp"),
        "log_position",
    ))

(projected.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_credit_checked")
    .trigger(availableNow=True)
    .toTable("loan_lakehouse.silver.credit_checked"))

silver_loan_auto_decisioned

The automated-decision phase produces one of three event types (LoanAutomaticallyApproved, LoanAutomaticallyDenied, or LoanApprovalNeeded). This notebook filters on all three and carries schema_name forward into Silver so Gold can disambiguate which outcome occurred.

"""
Silver: project automated-decision events from Bronze.
Carries schema_name forward to disambiguate the three outcome types.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

SCHEMA = StructType([
    StructField("LoanAutomatedDecisionTimestamp", StringType()),
])

AUTO_DECISION_TYPES = [
    "LoanAutomaticallyApproved",
    "LoanAutomaticallyDenied",
    "LoanApprovalNeeded",
]

stream = (spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", "0")
    .table("loan_lakehouse.bronze.events")
    .filter(F.col("schema_name").isin(AUTO_DECISION_TYPES)))

projected = (stream
    .withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
    .select(
        "stream",
        "schema_name",
        F.to_timestamp("payload.LoanAutomatedDecisionTimestamp")
         .alias("loan_automated_decision_timestamp"),
        "log_position",
    ))

(projected.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_loan_auto_decisioned")
    .trigger(availableNow=True)
    .toTable("loan_lakehouse.silver.loan_auto_decisioned"))

silver_loan_manual_decisioned

Same shape as the automated decision, but for the manual-review outcomes.

"""
Silver: project manual-decision events from Bronze.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

SCHEMA = StructType([
    StructField("ApproverName",                StringType()),
    StructField("LoanManualDecisionTimestamp", StringType()),
])

MANUAL_DECISION_TYPES = ["LoanManuallyApproved", "LoanManuallyDenied"]

stream = (spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", "0")
    .table("loan_lakehouse.bronze.events")
    .filter(F.col("schema_name").isin(MANUAL_DECISION_TYPES)))

projected = (stream
    .withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
    .select(
        "stream",
        "schema_name",
        F.col("payload.ApproverName").alias("approver_name"),
        F.to_timestamp("payload.LoanManualDecisionTimestamp")
         .alias("loan_manual_decision_timestamp"),
        "log_position",
    ))

(projected.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_loan_manual_decisioned")
    .trigger(availableNow=True)
    .toTable("loan_lakehouse.silver.loan_manual_decisioned"))

silver_automated_summary

"""
Silver: project AutomatedSummary events from Bronze.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

SCHEMA = StructType([
    StructField("CreditScoreSummary",                  StringType()),
    StructField("IncomeAndEmploymentSummary",          StringType()),
    StructField("LoanToIncomeSummary",                 StringType()),
    StructField("MaritalStatusAndDependentsSummary",   StringType()),
    StructField("RecommendedFurtherInvestigation",     StringType()),
    StructField("SummarizedBy",                        StringType()),
    StructField("SummarizedAt",                        StringType()),
])

stream = (spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", "0")
    .table("loan_lakehouse.bronze.events")
    .filter(F.col("schema_name") == "AutomatedSummary"))

projected = (stream
    .withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
    .select(
        "stream",
        F.col("payload.CreditScoreSummary").alias("credit_score_summary"),
        F.col("payload.IncomeAndEmploymentSummary").alias("income_and_employment_summary"),
        F.col("payload.LoanToIncomeSummary").alias("loan_to_income_summary"),
        F.col("payload.MaritalStatusAndDependentsSummary").alias("marital_status_and_dependents_summary"),
        F.col("payload.RecommendedFurtherInvestigation").alias("recommended_further_investigation"),
        F.col("payload.SummarizedBy").alias("summarized_by"),
        F.to_timestamp("payload.SummarizedAt").alias("summarized_at"),
        "log_position",
    ))

(projected.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_automated_summary")
    .trigger(availableNow=True)
    .toTable("loan_lakehouse.silver.automated_summary"))

Gold notebook — the loan lifecycle table

gold_loan_lifecycle joins the Silver tables and produces the same shape as the read model. Because Silver is incrementally maintained, this can either be a full rebuild each run (simplest, shown below) or an incremental MERGE (more efficient at scale).

"""
Gold: build the loan-lifecycle table from Silver projections.
Full rebuild each run — Silver tables are small relative to Bronze.
"""

spark.sql("""
INSERT OVERWRITE loan_lakehouse.gold.loan_lifecycle
SELECT *,
CASE
  WHEN age < 15 THEN '0 - 14 Years'
  WHEN age < 25 THEN '15 - 24 Years'
  WHEN age < 35 THEN '25 - 34 Years'
  WHEN age < 45 THEN '35 - 44 Years'
  WHEN age < 55 THEN '45 - 54 Years'
  WHEN age < 65 THEN '55 - 64 Years'
  WHEN age < 75 THEN '65 - 74 Years'
  WHEN age < 85 THEN '75 - 84 Years'
  WHEN age < 95 THEN '85 - 94 Years'
  ELSE '95+ Years'
END AS age_binned
FROM (
  SELECT
    lr.loan_request_id, lr.amount, lr.national_id, lr.name, lr.gender,
    lr.age, lr.marital_status, lr.dependents, lr.level_of_education,
    lr.employer_name, lr.job_title, lr.job_seniority, lr.income,
    lr.address_street, lr.address_city, lr.address_region,
    lr.address_country, lr.address_postal_code,
    lr.loan_requested_timestamp, lr.loan_product_id,
    cc.score, cc.credit_checked_timestamp,
    auto.loan_automated_decision_timestamp,
    manual.approver_name, manual.loan_manual_decision_timestamp,
    summary.credit_score_summary, summary.income_and_employment_summary,
    summary.loan_to_income_summary, summary.marital_status_and_dependents_summary,
    summary.recommended_further_investigation,
    summary.summarized_by, summary.summarized_at,
    COALESCE(manual.schema_name,
             CASE WHEN summary.stream IS NOT NULL THEN 'AutomatedSummary' END,
             auto.schema_name,
             CASE WHEN cc.stream IS NOT NULL THEN 'CreditChecked' END,
             CASE WHEN lr.stream IS NOT NULL THEN 'LoanRequested' END,
             'UNKNOWN') AS laststatus
  FROM loan_lakehouse.silver.loan_requested lr
  FULL OUTER JOIN loan_lakehouse.silver.credit_checked cc
    ON lr.stream = cc.stream
  FULL OUTER JOIN loan_lakehouse.silver.loan_auto_decisioned auto
    ON cc.stream = auto.stream
  FULL OUTER JOIN loan_lakehouse.silver.loan_manual_decisioned manual
    ON auto.stream = manual.stream
  FULL OUTER JOIN loan_lakehouse.silver.automated_summary summary
    ON manual.stream = summary.stream
) AS loan_lifecycle
""")

count = spark.table("loan_lakehouse.gold.loan_lifecycle").count()
print(f"Gold loan_lifecycle table refreshed: {count} loans")

Workflow — orchestrating the pipeline

In Workflows → Create Job, define a single job named loan_analytics_pipeline with the task graph:

bronze_ingest
    ├── silver_loan_requested
    ├── silver_credit_checked
    ├── silver_loan_auto_decisioned
    ├── silver_loan_manual_decisioned
    └── silver_automated_summary
                └── gold_loan_lifecycle

All five Silver tasks depend only on bronze_ingest and run in parallel; gold_loan_lifecycle depends on all five Silver tasks. Schedule the job hourly via 0 0 * * * ?.

For each task: notebook task → notebook path → cluster: kurrent-ingest.

Why this minimises duplication

  • Bronze carries the raw data JSON once, partitioned by event date. This is the durable Databricks-side audit copy.
  • Silver carries only the typed fields actually used in Gold — no JSON column duplicated, no fields beyond what downstream consumes. Typically 80–90% smaller than Bronze.
  • Gold carries one row per loan, not one row per event. The lifecycle table is small relative to the underlying event volume.

Together, the lakehouse footprint on the Databricks side is the raw events (once) plus the lifecycle table (once). KurrentDB remains the only place the events live in their original event-sourced form.

Recovery and replay

Because the entire pipeline is a deterministic function of kdb.records, recovery is straightforward. To rebuild from scratch:

-- Bronze and downstream rebuild
TRUNCATE TABLE loan_lakehouse.bronze.events;
TRUNCATE TABLE loan_lakehouse.silver.loan_requested;
TRUNCATE TABLE loan_lakehouse.silver.credit_checked;
TRUNCATE TABLE loan_lakehouse.silver.loan_auto_decisioned;
TRUNCATE TABLE loan_lakehouse.silver.loan_manual_decisioned;
TRUNCATE TABLE loan_lakehouse.silver.automated_summary;
DROP TABLE loan_lakehouse.gold.loan_lifecycle;

UPDATE loan_lakehouse.ops.ingest_hwm
SET last_position = 0 WHERE pipeline_name = 'kurrent_to_bronze';

Then run the workflow. The MERGE on log_position makes Bronze idempotent; Silver’s Change Data Feed checkpoints can be reset to startingVersion = 0 for full replay; Gold is fully recomputed each run anyway. This is the event-sourcing replay guarantee made concrete on the Databricks side.

Conclusion

This guide has demonstrated a complete medallion pipeline for incrementally ingesting event-sourced data from KurrentDB into Databricks:

  • Bronze lands raw events via Arrow Flight SQL, keyed by log_position for idempotent MERGE.
  • Silver projects each event type into typed columns, discarding the raw JSON to minimise duplication.
  • Gold joins the Silver tables into a single loan-lifecycle view ready for BI and ML consumption.
  • Recovery is deterministic — reset the high-water mark and replay from KurrentDB at any time.

Because KurrentDB is the system of record and the lakehouse is derived state, this architecture gives you the analytical power of Databricks without sacrificing the causal integrity, temporal ordering, and replay guarantees of an event-sourced system.