Le référentiel d'entreprise de la Betting League a été créé bien avant l'introduction d' Amplitude . Il est principalement utilisé par les analystes et les chercheurs. Les produits et les spécialistes du marketing se sont tournés vers les analystes pour obtenir des analyses de l'entrepôt, car cela nécessite des compétences en programmation.
DWH Facts a toujours manqué de quelque chose d'une vision d'épicerie, numérique dans les produits qui espionneraient les clients et nous donneraient un aperçu de ses voies. Avec l'avènement d'Amplitude dans l'entreprise, nous avons commencé à comprendre la valeur des données accumulées dans le système et c'est très cool de les utiliser dans Amplitude lui-même, mais la symbiose des deux systèmes DWH et Amplitude n'a pas donné de repos. Nous avons bien sûr implémenté la mécanique du transfert de données d'Amplitude pour une analyse interne dans un entrepôt d'entreprise et élaboré des instructions pour configurer le transfert de données d'Amplitude vers DWH. Nous vous invitons également au webinaire Betting League et Adventum sur l'analyse et l'optimisation des conversions dans le produit .
Comment l'agrégation de données DWH peut vous aider
1. . DWH, .
2. . .
3. . , API . .
Amplitude DWH
Amplitude API . . . , , , . . , , UTC — , .
. Python, SQL . ! Amplitude , .
, — Amplitude . , CSV, ETL .
ETL — Extract, Transform, Load. , , DWH .
. , . , , .
Python 3.7 . , flow- (, , dag), , Windows. .bat ( ). , .
1.
#
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
2.
, , . , .
#
os.chdir("C:\Agents\AMPL\TEMP") #
dts1 = time.time() #
a = time.time() #
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") #
3. API Amplitude
, (Settings => Project = > General).
# API
api_key = ''
secret_key = ' '
4. ()
, , . SQL , . yyyymmddThh (. . ). API , .
# DWH (SQL)
server = " "
user = ""
password = ""
#
conn = pymssql.connect(server, user, password, " ")
cursor = conn.cursor()
cursor.execute(" . select")
5.
API Amplitude. .
#
for row in cursor:
dt = row[0]
conn.close()
6.
, . , , , , .
# ,
filename = 'AMPL_PROD_'+ dt + '_' + now
# , \\ WIN
# , os.chdir
working_dir = os.getcwd() + '\\'
7. SQL
SQL. , .
# DWH (SQL). ,
server = ' '
database = ' '
schema = ' '
table_to_export = ' '
# DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
8. Amplitude
Amplitude , , json .
# API , json
class GetFile():
def __init__(self, now, dt, api_key, secret_key, filename, working_dir):
self.now = now
self.dt = dt
self.api_key = api_key
self.secret_key = secret_key
self.filename = filename
self.working_dir = working_dir
def response(self):
"""
"""
print(' !', end='\n')
count = 0
while True:
count += 1
print(f' {count}.....', end='')
try:
response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
auth=(self.api_key, self.secret_key),
timeout=10)
print('', end='\n')
print('1. ', end='\n')
break
except:
print('', end='\n')
time.sleep(1)
return response
def download(self):
'''
'''
with open(working_dir + self.filename + '.zip', "wb") as code:
file = self.response()
print('2. .....', end='')
code.write(file.content)
print('OK', end='\n')
def extraction(self):
'''
'''
z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
z.extractall(path=self.working_dir + self.filename)
print('3. ' + self.filename)
def getfilename(self):
'''
'''
return ': {} \n : {}'.format(self.filename, self.working_dir + self.filename + '.zip')
def unzip_and_create_df(working_dir, filename):
'''
JSON.gz json ( )
, .
'''
directory = working_dir + filename + '\\274440'
files = os.listdir(directory)
df = pd.DataFrame()
print(' :')
time.sleep(1)
for i in tqdm(files):
with gzip.open(directory + '\\' + i) as f:
add = pd.read_json(f, lines=True)
df = pd.concat([df, add], axis=0)
time.sleep(1)
print('4. JSON dataframe')
return df
#
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)
# ( )
file.download()
# gz-
file.extraction()
# DataFrame json.gz
adf = unzip_and_create_df(working_dir, filename)
9. ()
, . . , SQL. .
#
print('5. , , , .....', end='')
# DWH
# -
sql_query_columns = ("""
' '
""")
settdf = pd.read_sql_query(sql_query_columns, new_con)
# lower() (= ) SAVE_COLUMN_NAME dwh
# , lower()
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]
#
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]
#
needed_columns.append('DOWNLOAD_FILE_NAME')
# DF c
adf['DOWNLOAD_FILE_NAME'] = filename
# ( , , )
adf.reset_index(inplace=True)
# ( ) ,
adf = adf.astype('unicode_').where(pd.notnull(adf), None)
# DataFrame
df_to_sql = adf[needed_columns]
#
print('OK', end='\n')
10.
. .
# DWH
#
dts2 = time.time()
print('6. ...', end='')
# DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)
# () DWH ( - )
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
# None RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)
#
connection.close()
print('OK', end='\n')
dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + ' ' + str(int((dtf - i) % 60)) + ' ' for i in (dts1, dts2)]
print(f' : {diff1}, : {diff2}')
print(' , ')
11.
! . .
#
#
conn2 = pymssql.connect(server, user, password, " ")
cursor2 = conn2.cursor()
query = " , ")
#
cursor2.execute(query)
#
conn2.commit()
conn2.close()
12.
, . . , ? , ETL , .
print(' ')
# ETL
conn3 = pymssql.connect(server, user, password, " ")
cursor3 = conn3.cursor()
query = " ETL . EXEC dbo.SP"
cursor3.execute(query)
conn3.commit()
conn3.close()
13.
, .
#
b = time.time()
diff = b-a
minutes = diff//60
print(' : {:.0f} ()'.format( minutes))
. — , .
, ETL, . , - , .
, Amplitude, . s2s , .
20 17:00 . . , .