Outil multifonction de gestion d'entrepÎt de données - Cas Wheely + DBT

Depuis plus de deux ans maintenant, l' outil de création de données est activement utilisé chez Wheely pour gérer son entrepÎt de données. Pendant ce temps, une expérience considérable a été accumulée, nous sommes sur un chemin épineux d'essais et d'erreurs vers la perfection en ingénierie analytique .





, , dbt, , .





Wheely. dbt, , . .





-





( ) . - . , business stakeholders, . , . , , Keep it simple (KISS) Don’t repeat yourself (DRY).





. DWH , , ( ).





Schéma de couche de l'entrepÎt de données

– sources. , ELT-. 1:1 , - . flatten (JSON) . 





staging : , , case. , .





Intermediate , . - , 5-10 . 





data marts , Data Scientists / Business Users / BI tools. , , :





  • dimensions: , , , ,





  • facts: , , , ,





  • looker: , BI-





120  :





Running with dbt=0.19.0
Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures
      
      



:





  • 273





  • 493 , not null, unique, foreign key, accepted values





  • 6 SCD (slowly changing dimensions)





  • 532 ( )





  • 7 operations vacuum + analyze





  • 81





, -. , Marketing / Supply / Growth / B2B. , late arriving data /.





, . Marketing :





dbt run -m +tag:marketing
      
      



. . :





dbt
.

|____staging

| |____webhook

| |____receipt_prod

| |____core

| |____wheely_prod

| |____flights_prod

| |____online_hours_prod

| |____external

| |____financial_service

|____marts

| |____looker

| |____dim

| |____snapshots

| |____facts

|____flatten

| |____webhook

| |____receipt_prod

| |____wheely_prod

| |____communication_prod

|____audit

|____sources

|____aux

| |____dq

| | |____marts

| | |____external

|____intermediate
      
      



- . , . Wheely Amazon Redshift.





, . . – journeys ().





Les voyages mettent en valeur la chaßne de dépendance (voyages)
(journeys)

(join performance), , sources. - sort merge join:





– sort merge join
{{
   config(
       materialized='table',
       unique_key='request_id',
       dist="request_id",
       sort="request_id"
   )
}}
      
      



: city, country, completed timestamp, service group. Interleaved key I/O BI-.





– interleaved sortkey
{{
   config(
       materialized='table',
       unique_key='request_id',
       dist="request_id",
       sort_type='interleaved',
       sort=["completed_ts_loc"
               , "city"
               , "country"
               , "service_group"
               , "is_airport"
               , "is_wheely_journey"]
   )
}}
      
      



views ( ), . , staging, , :





       staging:
           +materialized: view
           +schema: staging
           +tags: ["staging"]
      
      



– . – ephemeral, .. , . . , .





. , , , . (delta) – , . where:





{{
   config(
       materialized='incremental',
       sort='metadata_timestamp',
       dist='fine_id',
       unique_key='id'
   )
}}

with fines as

   select 

         fine_id
       , city_id
       , amount
       , details
       , metadata_timestamp
       , created_ts_utc
       , updated_ts_utc
       , created_dt_utc 

   from {{ ref('stg_fines') }}
   where true
   -- filter fines arrived since last processed time
   {% if is_incremental() -%}
       and metadata_timestamp > (select max(metadata_timestamp) from {{ this }})
   {%- endif %} 

), 

...
      
      



, MPP , Data Engineer Data Warehouse Analyst ( !).





SQL + Jinja = Flexibility

SQL , Jinja .





, dbt compile & run. . CREATE : clustered by / distributed by / sorted by. :





Model code:
{{
   config(
       materialized='table',
       dist="fine_id",
       sort="created_ts_utc"
   )
}} 

with details as (

  select
       {{
       dbt_utils.star(from=ref('fine_details_flatten'),
           except=["fine_amount", "metadata_timestamp", "generated_number"]
       )
       }}
   from {{ ref('fine_details_flatten') }}
   where fine_amount > 0

)

select * from details
      
      



Compiled code:
with details as (  

   select
  
       "id",
       "fine_id",
       "city_id",
       "amount",
       "description",
       "created_ts_utc",
       "updated_ts_utc",
       "created_dt_utc"

   from "wheely"."dbt_test_akozyr"."fine_details_flatten"
   where fine_amount > 0


select * from details
      
      



Run code:
create  table
   "wheely"."dbt_test_akozyr"."f_chauffeurs_fines"
   diststyle key distkey (fine_id)   
   compound sortkey(created_ts_utc)
 as (    

with details as (  

   select

       "id",
       "fine_id",
       "city_id",
       "amount",
       "description",
       "created_ts_utc",
       "updated_ts_utc",
       "created_dt_utc"

   from "wheely"."dbt_test_akozyr"."fine_details_flatten"
   where fine_amount > 0

)

select * from details

 );
      
      



, , dbt. boilerplate code . .





-, , ? , , {{ ref('fine_details_flatten') }}



– . . .





Jinja Wheely dev / test / prod. . . , 3- . :





:
{% macro generate_schema_name_for_env(custom_schema_name, node) -%}
 
   {%- set default_schema = target.schema -%}

   {%- if target.name == 'prod' and custom_schema_name is not none -%} 

       {{ custom_schema_name | trim }} 

   {%- else -%}

       {{ default_schema }} 

   {%- endif -%} 

{%- endmacro %}
      
      



. , : , , , (-, ).





–

, , copy-paste . Wheely Do not repeat yourself - . .





:
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
 
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd 

{%- endmacro %}
      
      



:
 select     

       ...

       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('transfer_min_price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
       , {{ convert_currency('pickup_charge', 'currency') }}
       , {{ convert_currency('cancel_fee', 'currency') }}
       , {{ convert_currency('net_bookings', 'currency') }}
       , {{ convert_currency('gross_revenue', 'currency') }}
       , {{ convert_currency('service_charge', 'currency') }}     

       ... 

   from {{ ref('requests_joined') }} r   
      
      



, , Jinja. SQL-. - :





-- compare two columns
{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%}
  
   {%- if is_numeric == true -%}
       {%- set src_column = 'round(' + src_column + ', 2)' -%}       
       {%- set trg_column = 'round(' + trg_column + ', 2)' -%}
   {%- endif -%} 

       CASE
           WHEN {{ src_column }} = {{ trg_column }} THEN 'match'
           WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null'
           WHEN {{ src_column }} IS NULL THEN 'missing in source'
           WHEN {{ trg_column }} IS NULL THEN 'missing in target'
           WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch'
           ELSE 'unknown'
       END

{%- endmacro %}
      
      



UDF-:





UDF
-- cast epoch as human-readable timestamp
{% macro create_udf() -%}

{% set sql %}

       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT)
           RETURNS VARCHAR(512)
           STABLE
       AS $$
       # Convert column to binary, strip "0b" prefix, pad out with zeroes
       if bitwise_column is not None
           b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1]
           return b
       else:
           None 
       $$ LANGUAGE plpythonu
       ;  

       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP)
           RETURNS VARCHAR(128)
       STABLE
       AS $$
           SELECT nvl(
                         DECODE($2, null, null, 'deleted')
                       , DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown')
                       , DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null
                   )
       $$ LANGUAGE SQL
       ;         

 {% endset %}

 {% set table = run_query(sql) %}

{%- endmacro %}
      
      



, nested structures ( ) (external tables) S3 parquet. .





–

package - , , , . dbt hub , , , .





2 hooks dbt , , . - ( ):





models:
   +pre-hook: "{{ logging.log_model_start_event() }}"
   +post-hook: "{{ logging.log_model_end_event() }}"
      
      



Suivi du déploiement de modÚles dbt sur un cluster Redshift
dbt Redshift

, :





{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}
      
      



Dimension de calendrier générée par macro
,

dbt_external_tables Lakehouse, , S3. , , API Open Exchange Rates JSON:





External data stored in S3 accessed vith Redshift Spectrum
 - name: external
     schema: spectrum
     tags: ["spectrum"]
     description: "External data stored in S3 accessed vith Redshift Spectrum"
     tables:
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
         freshness:
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
         external:
           location: "s3://data-analytics.wheely.com/dwh/currencies/"
           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
         columns:
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
      
      



, , VACUUM + ANALYZE, Redshift PostgreSQL. , , . , dba .





dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'
      
      



VIDE + ANALYSER
VACUUM + ANALYZE

Running in production: dbt Cloud Wheely

dbt Cloud , dbt. , , , IDE ( !) .





: , , :





-, . , cron-, webhook. , - (kicked off from Airflow):





, . Slack Production-. .





dbt , dbt Cloud , . : Airflow, Prefect, Dagster, cron. Github Actions. .





Wheely , , . onboarding.





. :





  • Head of Data Insights - https://wheely.com/ru/careers/4425384003





  • Product Analyst, Backoffice - https://wheely.com/ru/careers/4308521003





  • Product Analyst, Business - https://wheely.com/ru/careers/4425290003





  • Product Analyst, Chauffeur growth - https://wheely.com/ru/careers/4185132003





  • Product Analyst, Marketplace - https://wheely.com/ru/careers/4425328003





  • Product Analyst, Passenger growth - https://wheely.com/ru/careers/4194291003





.





, . - Technology Enthusiast – https://t.me/enthusiastech





, , , dbt !








All Articles