
This is Part 2 of a two-part series.
Part 1 covered setup, connectivity, and building an analytics read model — a single Flight SQL query materialised daily into a Delta table and served to BI.
This document extends the pattern into a full medallion pipeline (Bronze → Silver → Gold) with hourly incremental ingestion, per-event-type typed tables, and orchestrated Databricks Workflows for higher throughput and sub-daily refresh.
Creating an Analytics Pipeline
For higher-throughput, sub-daily refresh, or use cases that require analytical joins with other Databricks-resident data, build a proper medallion pipeline. This section shows the simplest version of that pattern, with hourly scheduling and minimal data duplication. The same ADBC Flight SQL driver used in the read model also powers the Bronze ingestion here.
The end state has the same shape as the read model — one row per loan with current lifecycle state — but lives in its own loan_lakehouse.gold.loan_lifecycle table so the two patterns can coexist in the same catalog. The difference is how the data gets there:
- Bronze — hourly Flight SQL pull of new events into a thin landing table, keyed by log_position
- Silver — one table per event type, with JSON parsed into typed columns (no JSON column carried forward, to avoid duplication)
- Gold — a daily-or-hourly join across the Silver tables, producing the same lifecycle table as the read model
Create the tables
USE CATALOG loan_lakehouse;
-- Bronze: thin raw landing
CREATE TABLE IF NOT EXISTS bronze.events (
log_position BIGINT NOT NULL,
stream STRING NOT NULL,
category STRING NOT NULL,
schema_name STRING NOT NULL,
created_at TIMESTAMP NOT NULL,
data STRING,
ingested_at TIMESTAMP NOT NULL,
event_date DATE GENERATED ALWAYS AS (CAST(created_at AS DATE)),
CONSTRAINT pk_bronze_events PRIMARY KEY (log_position) RELY
)
USING DELTA
PARTITIONED BY (event_date)
TBLPROPERTIES (delta.enableChangeDataFeed = true);
-- Ops: high-water mark
CREATE TABLE IF NOT EXISTS ops.ingest_hwm (
pipeline_name STRING NOT NULL,
last_position BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL
) USING DELTA;
INSERT INTO ops.ingest_hwm
SELECT 'kurrent_to_bronze', 0, current_timestamp()
WHERE NOT EXISTS (SELECT 1 FROM ops.ingest_hwm WHERE pipeline_name = 'kurrent_to_bronze');
-- Silver: per event type, typed columns only (no JSON duplicated)
CREATE TABLE IF NOT EXISTS silver.loan_requested (
stream STRING NOT NULL, loan_request_id STRING, amount INT,
national_id STRING, name STRING, gender STRING, age INT,
marital_status STRING, dependents INT, level_of_education STRING,
employer_name STRING, job_title STRING, job_seniority FLOAT, income INT,
address_street STRING, address_city STRING, address_region STRING,
address_country STRING, address_postal_code STRING,
loan_requested_timestamp TIMESTAMP, loan_product_id INT,
log_position BIGINT NOT NULL
) USING DELTA;
CREATE TABLE IF NOT EXISTS silver.credit_checked (
stream STRING NOT NULL, score INT,
credit_checked_timestamp TIMESTAMP,
log_position BIGINT NOT NULL
) USING DELTA;
CREATE TABLE IF NOT EXISTS silver.loan_auto_decisioned (
stream STRING NOT NULL, schema_name STRING NOT NULL,
loan_automated_decision_timestamp TIMESTAMP,
log_position BIGINT NOT NULL
) USING DELTA;
CREATE TABLE IF NOT EXISTS silver.loan_manual_decisioned (
stream STRING NOT NULL, schema_name STRING NOT NULL,
approver_name STRING, loan_manual_decision_timestamp TIMESTAMP,
log_position BIGINT NOT NULL
) USING DELTA;
CREATE TABLE IF NOT EXISTS silver.automated_summary (
stream STRING NOT NULL,
credit_score_summary STRING, income_and_employment_summary STRING,
loan_to_income_summary STRING, marital_status_and_dependents_summary STRING,
recommended_further_investigation STRING,
summarized_by STRING, summarized_at TIMESTAMP,
log_position BIGINT NOT NULL
) USING DELTA;
CREATE TABLE IF NOT EXISTS gold.loan_lifecycle (
loan_request_id STRING,
amount INT,
national_id STRING,
name STRING,
gender STRING,
age INT,
marital_status STRING,
dependents INT,
level_of_education STRING,
employer_name STRING,
job_title STRING,
job_seniority FLOAT,
income INT,
address_street STRING,
address_city STRING,
address_region STRING,
address_country STRING,
address_postal_code STRING,
loan_requested_timestamp TIMESTAMP,
loan_product_id INT,
score INT,
credit_checked_timestamp TIMESTAMP,
loan_automated_decision_timestamp TIMESTAMP,
approver_name STRING,
loan_manual_decision_timestamp TIMESTAMP,
credit_score_summary STRING,
income_and_employment_summary STRING,
loan_to_income_summary STRING,
marital_status_and_dependents_summary STRING,
recommended_further_investigation STRING,
summarized_by STRING,
summarized_at TIMESTAMP,
laststatus STRING,
age_binned STRING
) USING DELTA;Bronze ingestion notebook
bronze_ingest — pulls new loan-category events from KurrentDB into Bronze, MERGEs on log_position for idempotency.
Cell 1 — install the ADBC Flight SQL driver:
%pip install adbc-driver-flightsql
dbutils.library.restartPython()Cell 2 — the ingestion logic:
"""
Bronze: pull new loanRequest events from KurrentDB via ADBC Flight SQL,
MERGE into bronze.events, advance HWM.
"""
import base64
import adbc_driver_flightsql.dbapi as dbapi
from adbc_driver_flightsql import DatabaseOptions
from pyspark.sql import functions as F
CLUSTER_HOST = "<cluster-id>.mesdb.eventstore.cloud"
CLUSTER_PORT = 2113
USERNAME = "admin"
PASSWORD = "changeit"
PIPELINE = "kurrent_to_bronze"
BATCH_LIMIT = 50_000
# 1. Read current HWM. If this pipeline has never run, no row exists yet —
# start from the beginning of the log.
hwm_rows = (spark.table("loan_lakehouse.ops.ingest_hwm")
.filter(F.col("pipeline_name") == PIPELINE)
.collect())
hwm = hwm_rows[0]["last_position"] if hwm_rows else 0
print(f"[{PIPELINE}] starting from log_position={hwm}")
# 2. Pull new events. Note created_at in kdb.records is Unix UTC ms (INT64),
# so we convert with epoch_ms() per the Kurrent docs.
query = f"""
SELECT stream, category, schema_name,
epoch_ms(created_at) AS created_at,
data,
log_position
FROM kdb.records
WHERE category = 'loanRequest'
AND log_position > {hwm}
ORDER BY log_position
LIMIT {BATCH_LIMIT}
"""
auth_str = f"{USERNAME}:{PASSWORD}"
encoded_auth = base64.b64encode(auth_str.encode()).decode()
conn = dbapi.connect(
f"grpc+tls://{CLUSTER_HOST}:{CLUSTER_PORT}",
db_kwargs={
DatabaseOptions.AUTHORIZATION_HEADER.value: f"Basic {encoded_auth}",
},
)
cur = conn.cursor()
cur.execute(query)
arrow_table = cur.fetch_arrow_table()
n = arrow_table.num_rows
print(f"[{PIPELINE}] fetched {n} events")
if n == 0:
dbutils.notebook.exit("OK_NO_DATA")
# 3. Arrow → Spark, add ingestion timestamp, MERGE
df = (spark.createDataFrame(arrow_table.to_pandas())
.withColumn("ingested_at", F.current_timestamp()))
df.createOrReplaceTempView("incoming")
spark.sql("""
MERGE INTO loan_lakehouse.bronze.events tgt
USING incoming src
ON tgt.log_position = src.log_position
WHEN NOT MATCHED THEN INSERT (log_position, stream, category, schema_name, created_at, data, ingested_at)
VALUES (src.log_position, src.stream, src.category, src.schema_name, src.created_at, src.data, src.ingested_at)
""")
# 4. Upsert HWM. MERGE handles both first-run (no row) and ongoing (row exists).
new_hwm = df.agg(F.max("log_position")).collect()[0][0]
spark.sql(f"""
MERGE INTO loan_lakehouse.ops.ingest_hwm tgt
USING (SELECT '{PIPELINE}' AS pipeline_name, CAST({new_hwm} AS BIGINT) AS last_position) src
ON tgt.pipeline_name = src.pipeline_name
WHEN MATCHED THEN UPDATE SET last_position = src.last_position,
updated_at = current_timestamp()
WHEN NOT MATCHED THEN INSERT (pipeline_name, last_position, updated_at)
VALUES (src.pipeline_name, src.last_position, current_timestamp())
""")
print(f"[{PIPELINE}] HWM advanced to {new_hwm}")
dbutils.notebook.exit(f"OK_{n}")Silver projection notebooks
One notebook per event type. Each reads new Bronze rows via Delta Change Data Feed and projects the JSON data field into typed columns. Crucially, Silver does not carry the data column forward — only the extracted typed fields — so the raw event JSON exists only once in Bronze.
NOTE: All five notebooks follow the same shape: filter Bronze by schema_name, parse data against a typed schema, project the columns Gold needs, and write to a per-event-type Silver table with availableNow (batch) trigger.
silver_loan_requested
"""
Silver: project LoanRequested events from Bronze into typed columns.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
SCHEMA = StructType([
StructField("LoanRequestID", StringType()),
StructField("Amount", IntegerType()),
StructField("NationalID", StringType()),
StructField("Name", StringType()),
StructField("Gender", StringType()),
StructField("Age", IntegerType()),
StructField("MaritalStatus", StringType()),
StructField("Dependents", IntegerType()),
StructField("LevelOfEducation", StringType()),
StructField("EmployerName", StringType()),
StructField("JobTitle", StringType()),
StructField("JobSeniority", FloatType()),
StructField("Income", IntegerType()),
StructField("Address", StructType([
StructField("Street", StringType()),
StructField("City", StringType()),
StructField("Region", StringType()),
StructField("Country", StringType()),
StructField("PostalCode", StringType()),
])),
StructField("LoanRequestedTimestamp", StringType()),
StructField("LoanProductID", IntegerType()),
])
stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "0")
.table("loan_lakehouse.bronze.events")
.filter(F.col("schema_name") == "LoanRequested"))
projected = (stream
.withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
.select(
"stream",
F.col("payload.LoanRequestID").alias("loan_request_id"),
F.col("payload.Amount").alias("amount"),
F.col("payload.NationalID").alias("national_id"),
F.col("payload.Name").alias("name"),
F.col("payload.Gender").alias("gender"),
F.col("payload.Age").alias("age"),
F.col("payload.MaritalStatus").alias("marital_status"),
F.col("payload.Dependents").alias("dependents"),
F.col("payload.LevelOfEducation").alias("level_of_education"),
F.col("payload.EmployerName").alias("employer_name"),
F.col("payload.JobTitle").alias("job_title"),
F.col("payload.JobSeniority").alias("job_seniority"),
F.col("payload.Income").alias("income"),
F.col("payload.Address.Street").alias("address_street"),
F.col("payload.Address.City").alias("address_city"),
F.col("payload.Address.Region").alias("address_region"),
F.col("payload.Address.Country").alias("address_country"),
F.col("payload.Address.PostalCode").alias("address_postal_code"),
F.to_timestamp("payload.LoanRequestedTimestamp").alias("loan_requested_timestamp"),
F.col("payload.LoanProductID").alias("loan_product_id"),
"log_position",
))
(projected.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_loan_requested")
.trigger(availableNow=True)
.toTable("loan_lakehouse.silver.loan_requested"))silver_credit_checked
"""
Silver: project CreditChecked events from Bronze.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
SCHEMA = StructType([
StructField("Score", IntegerType()),
StructField("CreditCheckedTimestamp", StringType()),
])
stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "0")
.table("loan_lakehouse.bronze.events")
.filter(F.col("schema_name") == "CreditChecked"))
projected = (stream
.withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
.select(
"stream",
F.col("payload.Score").alias("score"),
F.to_timestamp("payload.CreditCheckedTimestamp").alias("credit_checked_timestamp"),
"log_position",
))
(projected.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_credit_checked")
.trigger(availableNow=True)
.toTable("loan_lakehouse.silver.credit_checked"))silver_loan_auto_decisioned
The automated-decision phase produces one of three event types (LoanAutomaticallyApproved, LoanAutomaticallyDenied, or LoanApprovalNeeded). This notebook filters on all three and carries schema_name forward into Silver so Gold can disambiguate which outcome occurred.
"""
Silver: project automated-decision events from Bronze.
Carries schema_name forward to disambiguate the three outcome types.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
SCHEMA = StructType([
StructField("LoanAutomatedDecisionTimestamp", StringType()),
])
AUTO_DECISION_TYPES = [
"LoanAutomaticallyApproved",
"LoanAutomaticallyDenied",
"LoanApprovalNeeded",
]
stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "0")
.table("loan_lakehouse.bronze.events")
.filter(F.col("schema_name").isin(AUTO_DECISION_TYPES)))
projected = (stream
.withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
.select(
"stream",
"schema_name",
F.to_timestamp("payload.LoanAutomatedDecisionTimestamp")
.alias("loan_automated_decision_timestamp"),
"log_position",
))
(projected.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_loan_auto_decisioned")
.trigger(availableNow=True)
.toTable("loan_lakehouse.silver.loan_auto_decisioned"))silver_loan_manual_decisioned
Same shape as the automated decision, but for the manual-review outcomes.
"""
Silver: project manual-decision events from Bronze.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
SCHEMA = StructType([
StructField("ApproverName", StringType()),
StructField("LoanManualDecisionTimestamp", StringType()),
])
MANUAL_DECISION_TYPES = ["LoanManuallyApproved", "LoanManuallyDenied"]
stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "0")
.table("loan_lakehouse.bronze.events")
.filter(F.col("schema_name").isin(MANUAL_DECISION_TYPES)))
projected = (stream
.withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
.select(
"stream",
"schema_name",
F.col("payload.ApproverName").alias("approver_name"),
F.to_timestamp("payload.LoanManualDecisionTimestamp")
.alias("loan_manual_decision_timestamp"),
"log_position",
))
(projected.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_loan_manual_decisioned")
.trigger(availableNow=True)
.toTable("loan_lakehouse.silver.loan_manual_decisioned"))silver_automated_summary
"""
Silver: project AutomatedSummary events from Bronze.
"""
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
SCHEMA = StructType([
StructField("CreditScoreSummary", StringType()),
StructField("IncomeAndEmploymentSummary", StringType()),
StructField("LoanToIncomeSummary", StringType()),
StructField("MaritalStatusAndDependentsSummary", StringType()),
StructField("RecommendedFurtherInvestigation", StringType()),
StructField("SummarizedBy", StringType()),
StructField("SummarizedAt", StringType()),
])
stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "0")
.table("loan_lakehouse.bronze.events")
.filter(F.col("schema_name") == "AutomatedSummary"))
projected = (stream
.withColumn("payload", F.from_json("data", SCHEMA, {"mode": "PERMISSIVE"}))
.select(
"stream",
F.col("payload.CreditScoreSummary").alias("credit_score_summary"),
F.col("payload.IncomeAndEmploymentSummary").alias("income_and_employment_summary"),
F.col("payload.LoanToIncomeSummary").alias("loan_to_income_summary"),
F.col("payload.MaritalStatusAndDependentsSummary").alias("marital_status_and_dependents_summary"),
F.col("payload.RecommendedFurtherInvestigation").alias("recommended_further_investigation"),
F.col("payload.SummarizedBy").alias("summarized_by"),
F.to_timestamp("payload.SummarizedAt").alias("summarized_at"),
"log_position",
))
(projected.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/loan_lakehouse/ops/_checkpoints/silver_automated_summary")
.trigger(availableNow=True)
.toTable("loan_lakehouse.silver.automated_summary"))Gold notebook — the loan lifecycle table
gold_loan_lifecycle joins the Silver tables and produces the same shape as the read model. Because Silver is incrementally maintained, this can either be a full rebuild each run (simplest, shown below) or an incremental MERGE (more efficient at scale).
"""
Gold: build the loan-lifecycle table from Silver projections.
Full rebuild each run — Silver tables are small relative to Bronze.
"""
spark.sql("""
INSERT OVERWRITE loan_lakehouse.gold.loan_lifecycle
SELECT *,
CASE
WHEN age < 15 THEN '0 - 14 Years'
WHEN age < 25 THEN '15 - 24 Years'
WHEN age < 35 THEN '25 - 34 Years'
WHEN age < 45 THEN '35 - 44 Years'
WHEN age < 55 THEN '45 - 54 Years'
WHEN age < 65 THEN '55 - 64 Years'
WHEN age < 75 THEN '65 - 74 Years'
WHEN age < 85 THEN '75 - 84 Years'
WHEN age < 95 THEN '85 - 94 Years'
ELSE '95+ Years'
END AS age_binned
FROM (
SELECT
lr.loan_request_id, lr.amount, lr.national_id, lr.name, lr.gender,
lr.age, lr.marital_status, lr.dependents, lr.level_of_education,
lr.employer_name, lr.job_title, lr.job_seniority, lr.income,
lr.address_street, lr.address_city, lr.address_region,
lr.address_country, lr.address_postal_code,
lr.loan_requested_timestamp, lr.loan_product_id,
cc.score, cc.credit_checked_timestamp,
auto.loan_automated_decision_timestamp,
manual.approver_name, manual.loan_manual_decision_timestamp,
summary.credit_score_summary, summary.income_and_employment_summary,
summary.loan_to_income_summary, summary.marital_status_and_dependents_summary,
summary.recommended_further_investigation,
summary.summarized_by, summary.summarized_at,
COALESCE(manual.schema_name,
CASE WHEN summary.stream IS NOT NULL THEN 'AutomatedSummary' END,
auto.schema_name,
CASE WHEN cc.stream IS NOT NULL THEN 'CreditChecked' END,
CASE WHEN lr.stream IS NOT NULL THEN 'LoanRequested' END,
'UNKNOWN') AS laststatus
FROM loan_lakehouse.silver.loan_requested lr
FULL OUTER JOIN loan_lakehouse.silver.credit_checked cc
ON lr.stream = cc.stream
FULL OUTER JOIN loan_lakehouse.silver.loan_auto_decisioned auto
ON cc.stream = auto.stream
FULL OUTER JOIN loan_lakehouse.silver.loan_manual_decisioned manual
ON auto.stream = manual.stream
FULL OUTER JOIN loan_lakehouse.silver.automated_summary summary
ON manual.stream = summary.stream
) AS loan_lifecycle
""")
count = spark.table("loan_lakehouse.gold.loan_lifecycle").count()
print(f"Gold loan_lifecycle table refreshed: {count} loans")Workflow — orchestrating the pipeline
In Workflows → Create Job, define a single job named loan_analytics_pipeline with the task graph:
bronze_ingest
├── silver_loan_requested
├── silver_credit_checked
├── silver_loan_auto_decisioned
├── silver_loan_manual_decisioned
└── silver_automated_summary
└── gold_loan_lifecycleAll five Silver tasks depend only on bronze_ingest and run in parallel; gold_loan_lifecycle depends on all five Silver tasks. Schedule the job hourly via 0 0 * * * ?.
For each task: notebook task → notebook path → cluster: kurrent-ingest.
Why this minimises duplication
- Bronze carries the raw data JSON once, partitioned by event date. This is the durable Databricks-side audit copy.
- Silver carries only the typed fields actually used in Gold — no JSON column duplicated, no fields beyond what downstream consumes. Typically 80–90% smaller than Bronze.
- Gold carries one row per loan, not one row per event. The lifecycle table is small relative to the underlying event volume.
Together, the lakehouse footprint on the Databricks side is the raw events (once) plus the lifecycle table (once). KurrentDB remains the only place the events live in their original event-sourced form.
Recovery and replay
Because the entire pipeline is a deterministic function of kdb.records, recovery is straightforward. To rebuild from scratch:
-- Bronze and downstream rebuild
TRUNCATE TABLE loan_lakehouse.bronze.events;
TRUNCATE TABLE loan_lakehouse.silver.loan_requested;
TRUNCATE TABLE loan_lakehouse.silver.credit_checked;
TRUNCATE TABLE loan_lakehouse.silver.loan_auto_decisioned;
TRUNCATE TABLE loan_lakehouse.silver.loan_manual_decisioned;
TRUNCATE TABLE loan_lakehouse.silver.automated_summary;
DROP TABLE loan_lakehouse.gold.loan_lifecycle;
UPDATE loan_lakehouse.ops.ingest_hwm
SET last_position = 0 WHERE pipeline_name = 'kurrent_to_bronze';Then run the workflow. The MERGE on log_position makes Bronze idempotent; Silver’s Change Data Feed checkpoints can be reset to startingVersion = 0 for full replay; Gold is fully recomputed each run anyway. This is the event-sourcing replay guarantee made concrete on the Databricks side.
Conclusion
This guide has demonstrated a complete medallion pipeline for incrementally ingesting event-sourced data from KurrentDB into Databricks:
- Bronze lands raw events via Arrow Flight SQL, keyed by
log_positionfor idempotent MERGE. - Silver projects each event type into typed columns, discarding the raw JSON to minimise duplication.
- Gold joins the Silver tables into a single loan-lifecycle view ready for BI and ML consumption.
- Recovery is deterministic — reset the high-water mark and replay from KurrentDB at any time.
Because KurrentDB is the system of record and the lakehouse is derived state, this architecture gives you the analytical power of Databricks without sacrificing the causal integrity, temporal ordering, and replay guarantees of an event-sourced system.
