Building a Property Insurance Claims Data Lakehouse with Airflow and Databricks
In this branch of our series on building property insurance claims data platforms we're going to use the Databricks platform for our storage system. We need to process the incoming TPA bordereaux data each day, and we'll use Airflow pipelines to orchestrate python code to get the data into data lakehouse.
Since we're focused on managing a budget and accurately forecasting costs in property insurance, we want to get our claims data updates at least daily so that our chief claims officer can make the best quality decisions possible. To do that we're going to have to integrate bordereaux data with different schemas in our automated pipeline. Let's start out by setting up a data lakehouse to store the incoming data once we have it converted and merged each day.
Building a Claims Delta Lake Table in Databricks
A Databricks Table is a structured dataset stored as files in cloud object storage and tracked through a catalog and metastore. Databricks offers two types: managed tables, which it controls fully within its storage environment, and unmanaged (external) tables, where storage and governance remain with the user.
Building your data platform around managed data tables has the following key advantages of unmanaged blob storage.
-
Reliable Data with ACID Transactions
- Prevents data corruption from concurrent writes.
- Supports safe, multi-user ETL operations.
-
Time Travel for Easy Recovery
- Access past versions of data anytime.
- Helps with audits, debugging, and rollbacks.
-
Faster Queries and Lower Costs
- Built-in optimizations like file compaction and indexing.
- Speeds up analytics and reduces compute/storage usage.
This flexibility allows insurance data teams to balance control with performance, optimizing for both compliance and scale.
To begin building a modern data lakehouse on Databricks, you’ll need to create a Databricks account and provision a workspace. Current Databricks workspaces are designed to leverage Unity Catalog for centralized governance and support Serverless SQL for scalable, on-demand compute.
Within the workspace, set up a catalog and a namespace (schema) to logically organize your claims data. For structuring tables and data pipelines, the Medallion Architecture offers a useful framework—dividing data into Bronze (raw), Silver (cleaned), and Gold (aggregated) layers—to support scalable, modular claims processing and analytics.
Building a Data Table for Bordereaux Claims Data
The SQL create table statement to setup our data lakehouse managed data table is shown below.
CREATE TABLE claims_workspace.bronze.bordereaux_data (
policy_number STRING,
months_insured INT,
has_claims BOOLEAN,
items_insured INT,
claim_reference STRING,
insured_name STRING,
policy_start_date TIMESTAMP,
date_of_loss TIMESTAMP,
date_reported TIMESTAMP,
claim_status STRING,
loss_type STRING,
paid_amount DOUBLE,
reserve_amount DOUBLE,
total_incurred DOUBLE,
claim_propensity DOUBLE
);
In the code listing below, I show a JSON-format sample of a row of bordereaux data so you can get a feel for the type of data that we'll be working with.
{
"policy_number": "POL1038",
"months_insured": 113,
"has_claims": true,
"items_insured": 4,
"claim_reference": "CLM1071",
"insured_name": "Company 1038",
"policy_start_date": "2023-10-02",
"date_of_loss": "2024-01-16",
"date_reported": "2024-02-15",
"claim_status": "Open",
"loss_type": "Property Damage",
"paid_amount": 11463.65,
"reserve_amount": 11962.45,
"total_incurred": 23426.10,
"claim_propensity": 0.5
}
The challenge with this data is, however, that each partner TPA has their own column schema for the bordereaux data they'll send over, so we'll need to do some translation work in our ingest pipeline.
Airflow Pipeline to Process Incoming Bordereaux Files
We need a way to run a data pipeline each day and pickup all of the files sent in via email from our partner TPAs. In the last article in this series, we showed how to use Airflow to watch an email address to pickup bordereaux files as they come in and save them to S3 for processing later.
Apache Airflow is an open-source workflow orchestration tool designed to programmatically author, schedule, and monitor data pipelines. For data platforms, Airflow provides a scalable and reliable way to manage complex dependencies across ETL jobs, analytics workflows, and machine learning pipelines. Its modular architecture and support for DAG-based task scheduling make it a critical component in ensuring data quality, operational consistency, and timely data delivery across the enterprise.
So we have a subdirectory per company in S3 for incoming daily bordereaux files that need to be converted before they can be written to our data lakehouse data table. The first stage in this Airflow work does just this and then writes the merged data back to S3 for further processing.
DAG Step 1: Merging Multiple TPA Schemas Together
The first stage of our Airflow job uses a per-company schema configuration file to analyze each incoming file and then convert it to a normalized schema that is compatible with our data table.
...
def merge_incoming_s3_tpa_schemas():
schema_conf_file_string = load_s3_file_as_text(bucket, subdirectory + schema_conf_file)
data_company_schemas = json.loads( schema_conf_file_string )
subdirs, subdirs_full = list_subdirectories(bucket, subdirectory)
tpa_converted_df_list = []
print("Processing Subdirectories:")
for subdir, full_subdir in zip(subdirs, subdirs_full):
print(full_subdir)
file_list_subdir = list_files(bucket, full_subdir)
for filename in file_list_subdir:
print("\t" + filename)
df_tmp = load_s3_file_to_dataframe(bucket, full_subdir + filename)
print("Convert schema to standard internal schema: " + filename)
df_new_claims_cols = df_tmp.rename(columns=data_company_schemas[subdir]["schema"])
df_new_claims_cols['policy_start_date'] = pd.to_datetime(df_new_claims_cols['policy_start_date'], format=data_company_schemas[subdir]["date_format"])
df_new_claims_cols['date_of_loss'] = pd.to_datetime(df_new_claims_cols['date_of_loss'], format=data_company_schemas[subdir]["date_format"])
df_new_claims_cols['date_reported'] = pd.to_datetime(df_new_claims_cols['date_reported'], format=data_company_schemas[subdir]["date_format"])
tpa_converted_df_list.append( df_new_claims_cols )
df_all_bordereaux = pd.concat(tpa_converted_df_list)
s3_key_write_merged_data = 'test-bordereaux-data/daily_bordereaux_merged_data/20250422_daily_merged.csv'
write_df_to_s3_csv(df_all_bordereaux, bucket, s3_key_write_merged_data)
with DAG(
'dbx_merge_incoming_tpa_schemas',
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
description="Takes the incoming bordereaux data in different schemas in S3 and converts them to a single schema. Also merges them into a single csv.",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["s3", "bordereaux", "bronze"],
) as dag:
task_read_file = PythonOperator(
task_id='merge_tpa_schemas_python_task',
python_callable=merge_incoming_s3_tpa_schemas
)
The above code does some Pandas dataframe magic to process each incoming company's specific schema (defined in the json file) and then merges all the converted dataframes together, before writing it back to S3. In the next step, we take this result and load it into our lakehouse table.
DAG Step 2: Loading New Data Into Delta Lake Table
Once our pipeline has merged the converted bordereaux data, it now needs to load this data into our data lakehouse data table. In the Airflow code below we can see the DAG task that uses the DatabricksSqlOperator
to execute our saved load_s3_bordereaux_into_dbx_airflow.sql
statement that will load the data from S3 into the data table.
...
with DAG(
'dbx_load_data_into_dbx_lakehouse',
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
description="Loads data into Databricks Delta Lake Table from S3",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["s3", "bordereaux", "silver"],
) as dag:
test_dbx_data_task = DatabricksSqlOperator(
task_id="dbx_sql_data_insert_task",
databricks_conn_id="databricks_connection",
http_path=sql_endpoint_name,
sql="load_s3_bordereaux_into_dbx_airflow.sql"
)
The above Airflow operator executes the Databricks SQL below inside the Databricks platform on the SQL Server-less data warehouse.
copy into claims_workspace.bronze.bordereaux_data
from
(
SELECT
policy_number,
CAST(months_insured as BIGINT) as months_insured,
has_claims,
CAST(items_insured as BIGINT) as items_insured,
claim_reference,
insured_name,
CAST(policy_start_date as DATE) as policy_start_date,
CAST(date_of_loss as DATE) as date_of_loss,
CAST(date_reported as DATE) as date_reported,
claim_status,
loss_type,
paid_amount,
reserve_amount,
total_incurred,
claim_propensity
FROM 's3://property-insurance-examples/test-bordereaux-data/daily_bordereaux_merged_data/'
)
FILEFORMAT = CSV FORMAT_OPTIONS('sep' = ',', 'header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('force' = 'true', 'mergeSchema' = 'true')
The SQL statement will pull data from S3 where our Airflow DAG wrote the converted and merged bordereaux data and write it into the Delta lake table we previously created.
But the Pipeline Doesn't Do X...
The challenge of platform architecture examples is that its impossible to cover every variation that you may want to try. In this example we're skipping a few things like "pipeline flexibiltiy" and also merging row updates as part of the data table update process. If we're missing a key feature you want to talk about, reach out and let's talk and we'll see how we can help.
From Data Tables to Data Models
Our data lake tables are a great starting point, but we want to standardize the knowledge work formulas over our data lakehouse in a way that is easily accessible to the claims team. To do that, we'll use the Cube.dev semantic layer to integrate with our Databricks claims data lakehouse and build standardized claims data models. This allows us to further refine the data in a data table into standardized claims information (in our knowledge work architecture "information layer") that is useful to the claims team.
Data models in a semantic layer add critical business context and structure on top of raw data tables in a lakehouse, enabling more consistent, governed, and user-friendly access to information. Unlike raw tables, semantic models define metrics, dimensions, and relationships in business terms, allowing teams to self-serve insights without needing to understand complex SQL or data schemas. This abstraction enhances data trust, accelerates decision-making, and ensures alignment across analytics, reporting, and AI applications.
In the next post we connect our Databricks data platform to the Cube.dev semantic layer and construct our claims data models for use in the claims team's knowledge work tools.
Next in Series
Building a Standard Claims Data Model With the Cube Semantic Layer and Databricks
Creating standard claims data models for claims knowledge work in property insurance companies.
Read next article in series