Je veux tout savoir sur le client! Ou comment enrichir les faits DWH secs avec des chemins numériques et des propriétés client d'Amplitude

Le référentiel d'entreprise de la Betting League a été créé bien avant l'introduction d' Amplitude . Il est principalement utilisé par les analystes et les chercheurs. Les produits et les spécialistes du marketing se sont tournés vers les analystes pour obtenir des analyses de l'entrepôt, car cela nécessite des compétences en programmation.







DWH Facts a toujours manqué de quelque chose d'une vision d'épicerie, numérique dans les produits qui espionneraient les clients et nous donneraient un aperçu de ses voies. Avec l'avènement d'Amplitude dans l'entreprise, nous avons commencé à comprendre la valeur des données accumulées dans le système et c'est très cool de les utiliser dans Amplitude lui-même, mais la symbiose des deux systèmes DWH et Amplitude n'a pas donné de repos. Nous avons bien sûr implémenté la mécanique du transfert de données d'Amplitude pour une analyse interne dans un entrepôt d'entreprise et élaboré des instructions pour configurer le transfert de données d'Amplitude vers DWH. Nous vous invitons également au webinaire Betting League et Adventum sur l'analyse et l'optimisation des conversions dans le produit .







image







Comment l'agrégation de données DWH peut vous aider



1. . DWH, .







2. . .







3. . , API . .







Amplitude DWH



Amplitude API . . . , , , . . , , UTC — , .







. Python, SQL . ! Amplitude , .







, — Amplitude . , CSV, ETL .







ETL — Extract, Transform, Load. , , DWH .







. , . , , .







Python 3.7 . , flow- (, , dag), , Windows. .bat ( ). , .







1.



# 
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
      
      





2.



, , . , .







#   
os.chdir("C:\Agents\AMPL\TEMP") #    
dts1 = time.time() #       
a = time.time() #  
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") #    
      
      





3. API Amplitude



, (Settings => Project = > General).







#    API 
api_key = ''
secret_key = '  '
      
      





4. ()



, , . SQL , . yyyymmddThh (. . ). API , .







#     DWH (SQL)
server = " "
user = ""
password = ""

#     
conn = pymssql.connect(server, user, password, " ")
cursor = conn.cursor()
cursor.execute("   .    select")
      
      





5.



API Amplitude. .







#        
for row in cursor: 
    dt = row[0]
conn.close()   
      
      





6.



, . , , , , .







#   ,     
filename = 'AMPL_PROD_'+ dt + '_' + now

#  ,     \\  WIN
#      ,      os.chdir
working_dir = os.getcwd() + '\\'

      
      





7. SQL



SQL. , .







#    DWH (SQL). ,           
server = ' '
database = ' '
schema = ' '
table_to_export = ' '

#    DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
      
      





8. Amplitude



Amplitude , , json .







#     API ,     json
class GetFile():

    def __init__(self, now, dt, api_key, secret_key, filename, working_dir):

        self.now = now
        self.dt = dt
        self.api_key = api_key
        self.secret_key = secret_key
        self.filename = filename
        self.working_dir = working_dir

    def response(self):
        """
           
        """
        print('   !', end='\n')
        count = 0
        while True:
            count += 1
            print(f' {count}.....', end='')
            try:
                response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
                                        auth=(self.api_key, self.secret_key),
                                        timeout=10)
                print('', end='\n') 
                print('1.    ', end='\n')
                break
            except:
                print('', end='\n')
                time.sleep(1)

        return response

    def download(self):
        '''
           
        '''
        with open(working_dir + self.filename + '.zip', "wb") as code:
            file = self.response()
            print('2.    .....', end='')           
            code.write(file.content)
        print('OK', end='\n')

    def extraction(self):
        '''
             
        '''
        z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
        z.extractall(path=self.working_dir + self.filename)
        print('3.         ' + self.filename)

    def getfilename(self):
        '''
            
        '''
        return ': {} \n : {}'.format(self.filename, self.working_dir + self.filename + '.zip')

def unzip_and_create_df(working_dir, filename):
        '''
         JSON.gz   json     (   )
         ,    .
        '''
        directory = working_dir + filename + '\\274440'
        files = os.listdir(directory)
        df = pd.DataFrame()
        print('  :')
        time.sleep(1)
        for i in tqdm(files):
            with gzip.open(directory + '\\' + i) as f:
                add = pd.read_json(f, lines=True)
            df = pd.concat([df, add], axis=0)
        time.sleep(1)    
        print('4. JSON         dataframe')
        return df

#    
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)

#   (      )
file.download()

#  gz-   
file.extraction()

#   DataFrame    json.gz
adf = unzip_and_create_df(working_dir, filename)

      
      





9. ()



, . . , SQL. .







#    
print('5.    ,  , , .....', end='')

#   DWH        
#       -   
sql_query_columns = ("""
                        '             '
                    """)

settdf = pd.read_sql_query(sql_query_columns, new_con)

#   lower()  (= )   SAVE_COLUMN_NAME  dwh
#   , lower()       
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]

#   
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]

#     
needed_columns.append('DOWNLOAD_FILE_NAME')

#    DF c  
adf['DOWNLOAD_FILE_NAME'] = filename

#   ( , ,  )
adf.reset_index(inplace=True)

#    ( )   ,   
adf = adf.astype('unicode_').where(pd.notnull(adf), None)

#  DataFrame    
df_to_sql = adf[needed_columns]

#     
print('OK', end='\n')
      
      





10.



. .







#    DWH
#   
dts2 = time.time()
print('6.    ...', end='')

#      DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)

#   ()    DWH (   -  )
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

#  None  RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)

#    
connection.close() 
print('OK', end='\n')

dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + '  ' + str(int((dtf - i) % 60)) + ' ' for i in (dts1, dts2)]
print(f' : {diff1},   : {diff2}')
print(' ,   ')

      
      





11.



! . .







#    
#     
conn2 = pymssql.connect(server, user, password, "  ")
cursor2 = conn2.cursor()
query = "      ,  ")

#  
cursor2.execute(query)

#    
conn2.commit()
conn2.close()

      
      





12.



, . . , ? , ETL , .







print('  ')

#       ETL       
conn3 = pymssql.connect(server, user, password, "  ")
cursor3 = conn3.cursor()
query = " ETL   .  EXEC dbo.SP"

cursor3.execute(query)

conn3.commit()
conn3.close()
      
      





13.



, .







#      
b = time.time()
diff = b-a
minutes = diff//60
print('  : {:.0f} ()'.format( minutes))

      
      





. — , .







, ETL, . , - , .







, Amplitude, . s2s , .







20 17:00 . . , .








All Articles