
Depuis plus de deux ans maintenant, l' outil de création de données est activement utilisé chez Wheely pour gérer son entrepÎt de données. Pendant ce temps, une expérience considérable a été accumulée, nous sommes sur un chemin épineux d'essais et d'erreurs vers la perfection en ingénierie analytique .
, , dbt, , .
Wheely. dbt, , . .
-
( ) . - . , business stakeholders, . , . , , Keep it simple (KISS) Donât repeat yourself (DRY).
. DWH , , ( ).

â sources. , ELT-. 1:1 , - . flatten (JSON) .
staging : , , case. , .
Intermediate , . - , 5-10 .
data marts , Data Scientists / Business Users / BI tools. , , :
dimensions: , , , ,
facts: , , , ,
looker: , BI-
120 :
Running with dbt=0.19.0 Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures
:
273
493 , not null, unique, foreign key, accepted values
6 SCD (slowly changing dimensions)
532 ( )
7 operations vacuum + analyze
81
, -. , Marketing / Supply / Growth / B2B. , late arriving data /.
, . Marketing :
dbt run -m +tag:marketing
. . :
dbt
. |____staging | |____webhook | |____receipt_prod | |____core | |____wheely_prod | |____flights_prod | |____online_hours_prod | |____external | |____financial_service |____marts | |____looker | |____dim | |____snapshots | |____facts |____flatten | |____webhook | |____receipt_prod | |____wheely_prod | |____communication_prod |____audit |____sources |____aux | |____dq | | |____marts | | |____external |____intermediate
- . , . Wheely Amazon Redshift.
, . . â journeys ().

(join performance), , sources. - sort merge join:
â sort merge join
{{
config(
materialized='table',
unique_key='request_id',
dist="request_id",
sort="request_id"
)
}}
: city, country, completed timestamp, service group. Interleaved key I/O BI-.
â interleaved sortkey
{{
config(
materialized='table',
unique_key='request_id',
dist="request_id",
sort_type='interleaved',
sort=["completed_ts_loc"
, "city"
, "country"
, "service_group"
, "is_airport"
, "is_wheely_journey"]
)
}}
views ( ), . , staging, , :
staging:
+materialized: view
+schema: staging
+tags: ["staging"]
â . â ephemeral, .. , . . , .
. , , , . (delta) â , . where:
{{
config(
materialized='incremental',
sort='metadata_timestamp',
dist='fine_id',
unique_key='id'
)
}}
with fines as (
select
fine_id
, city_id
, amount
, details
, metadata_timestamp
, created_ts_utc
, updated_ts_utc
, created_dt_utc
from {{ ref('stg_fines') }}
where true
-- filter fines arrived since last processed time
{% if is_incremental() -%}
and metadata_timestamp > (select max(metadata_timestamp) from {{ this }})
{%- endif %}
),
...
, MPP , Data Engineer Data Warehouse Analyst ( !).
SQL + Jinja = Flexibility
SQL , Jinja .
, dbt compile & run. . CREATE : clustered by / distributed by / sorted by. :
Model code:
{{
config(
materialized='table',
dist="fine_id",
sort="created_ts_utc"
)
}}
with details as (
select
{{
dbt_utils.star(from=ref('fine_details_flatten'),
except=["fine_amount", "metadata_timestamp", "generated_number"]
)
}}
from {{ ref('fine_details_flatten') }}
where fine_amount > 0
)
select * from details
Compiled code:
with details as (
select
"id",
"fine_id",
"city_id",
"amount",
"description",
"created_ts_utc",
"updated_ts_utc",
"created_dt_utc"
from "wheely"."dbt_test_akozyr"."fine_details_flatten"
where fine_amount > 0
)
select * from details
Run code:
create table
"wheely"."dbt_test_akozyr"."f_chauffeurs_fines"
diststyle key distkey (fine_id)
compound sortkey(created_ts_utc)
as (
with details as (
select
"id",
"fine_id",
"city_id",
"amount",
"description",
"created_ts_utc",
"updated_ts_utc",
"created_dt_utc"
from "wheely"."dbt_test_akozyr"."fine_details_flatten"
where fine_amount > 0
)
select * from details
);
, , dbt. boilerplate code . .
-, , ? , , {{ ref('fine_details_flatten') }}
â . . .
Jinja Wheely dev / test / prod. . . , 3- . :
:
{% macro generate_schema_name_for_env(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' and custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- endmacro %}
. , : , , , (-, ).

â
, , copy-paste . Wheely Do not repeat yourself - . .
:
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
, ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
, ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
, ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
, ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}
:
select
...
-- price_details
, r.currency
, {{ convert_currency('price', 'currency') }}
, {{ convert_currency('transfer_min_price', 'currency') }}
, {{ convert_currency('discount', 'currency') }}
, {{ convert_currency('insurance', 'currency') }}
, {{ convert_currency('tips', 'currency') }}
, {{ convert_currency('parking', 'currency') }}
, {{ convert_currency('toll_road', 'currency') }}
, {{ convert_currency('pickup_charge', 'currency') }}
, {{ convert_currency('cancel_fee', 'currency') }}
, {{ convert_currency('net_bookings', 'currency') }}
, {{ convert_currency('gross_revenue', 'currency') }}
, {{ convert_currency('service_charge', 'currency') }}
...
from {{ ref('requests_joined') }} r
, , Jinja. SQL-. - :
-- compare two columns
{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%}
{%- if is_numeric == true -%}
{%- set src_column = 'round(' + src_column + ', 2)' -%}
{%- set trg_column = 'round(' + trg_column + ', 2)' -%}
{%- endif -%}
CASE
WHEN {{ src_column }} = {{ trg_column }} THEN 'match'
WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null'
WHEN {{ src_column }} IS NULL THEN 'missing in source'
WHEN {{ trg_column }} IS NULL THEN 'missing in target'
WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch'
ELSE 'unknown'
END
{%- endmacro %}
UDF-:
UDF
-- cast epoch as human-readable timestamp
{% macro create_udf() -%}
{% set sql %}
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT)
RETURNS VARCHAR(512)
STABLE
AS $$
# Convert column to binary, strip "0b" prefix, pad out with zeroes
if bitwise_column is not None:
b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1]
return b
else:
None
$$ LANGUAGE plpythonu
;
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP)
RETURNS VARCHAR(128)
STABLE
AS $$
SELECT nvl(
DECODE($2, null, null, 'deleted')
, DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown')
, DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null)
)
$$ LANGUAGE SQL
;
{% endset %}
{% set table = run_query(sql) %}
{%- endmacro %}
, nested structures ( ) (external tables) S3 parquet. .
â
package - , , , . dbt hub , , , .

2 hooks dbt , , . - ( ):
models:
+pre-hook: "{{ logging.log_model_start_event() }}"
+post-hook: "{{ logging.log_model_end_event() }}"

, :
{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}

dbt_external_tables Lakehouse, , S3. , , API Open Exchange Rates JSON:
External data stored in S3 accessed vith Redshift Spectrum
- name: external
schema: spectrum
tags: ["spectrum"]
description: "External data stored in S3 accessed vith Redshift Spectrum"
tables:
- name: currencies_oxr
description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
freshness:
error_after: {count: 15, period: hour}
loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
external:
location: "s3://data-analytics.wheely.com/dwh/currencies/"
row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
columns:
- name: timestamp
data_type: bigint
- name: base
data_type: varchar(3)
- name: rates
data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
, , VACUUM + ANALYZE, Redshift PostgreSQL. , , . , dba .
dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'

Running in production: dbt Cloud Wheely
dbt Cloud , dbt. , , , IDE ( !) .
: , , :

-, . , cron-, webhook. , - (kicked off from Airflow):

, . Slack Production-. .

dbt , dbt Cloud , . : Airflow, Prefect, Dagster, cron. Github Actions. .
Wheely , , . onboarding.
. :
Head of Data Insights - https://wheely.com/ru/careers/4425384003
Product Analyst, Backoffice - https://wheely.com/ru/careers/4308521003
Product Analyst, Business - https://wheely.com/ru/careers/4425290003
Product Analyst, Chauffeur growth - https://wheely.com/ru/careers/4185132003
Product Analyst, Marketplace - https://wheely.com/ru/careers/4425328003
Product Analyst, Passenger growth - https://wheely.com/ru/careers/4194291003
.
, . - Technology Enthusiast â https://t.me/enthusiastech
, , , dbt !