Bonjour, chers lecteurs! Il y a quelques jours, relisant le livre d'Anthony Molinaro «SQL. Une collection de recettes », dans l'un des chapitres, je suis tombé sur un sujet qui était consacré à la détermination du début et de la fin de la plage de valeurs consécutives. Après avoir lu brièvement le matériel, je me suis immédiatement rappelé que j'avais déjà rencontré cette question comme l'une des tâches de test, mais ensuite le sujet a été déclaré comme «La tâche de trouver des sessions». L'astuce de l'entretien technique n'était pas un examen du travail effectué, mais l'une des questions de l'intervieweur sur la façon d'obtenir des valeurs similaires en utilisant Spark. En préparant l'entretien, je ne savais pas que l'entreprise utilisait (ou peut-être pas ...) Apache Spark, et je n'ai donc pas collecté d'informations sur un nouvel outil pour moi à ce moment-là. Il ne restait plus qu'à émettre l'hypothèse que la solution souhaitée pourrait être comme un script,qui peut être écrit en utilisant la bibliothèque Pandas. Bien que très éloigné, j'atteins toujours la cible, mais je n'ai pas réussi à travailler dans cette organisation.
Pour être honnête, je tiens à noter qu'au fil des ans, j'ai fait peu de progrès dans l'apprentissage d'Apache Spark. Mais je souhaite toujours partager les meilleures pratiques avec les lecteurs, car de nombreux analystes ne sont pas du tout tombés sur cet outil, et d'autres peuvent avoir une interview similaire. Si vous êtes un professionnel Spark, vous pouvez toujours suggérer un code plus optimal dans les commentaires de l'article.
C'était un préambule, passons directement à l'analyse de ce sujet. Commençons par écrire un script SQL. Mais d'abord, créons une base de données et remplissons-la de valeurs. Puisqu'il s'agit d'un exemple de démonstration, je suggère d'utiliser SQLite. Cette base de données est inférieure à des «collègues de la boutique» plus puissants, mais ses capacités de développement de scripts nous suffisent pleinement. Pour automatiser les opérations ci-dessus, j'ai écrit le code suivant en Python.
#
import sqlite3
#
projects = [
('2020-01-01', '2020-01-02'),
('2020-01-02', '2020-01-03'),
('2020-01-03', '2020-01-04'),
('2020-01-04', '2020-01-05'),
('2020-01-06', '2020-01-07'),
('2020-01-16', '2020-01-17'),
('2020-01-17', '2020-01-18'),
('2020-01-18', '2020-01-19'),
('2020-01-19', '2020-01-20'),
('2020-01-21', '2020-01-22'),
('2020-01-26', '2020-01-27'),
('2020-01-27', '2020-01-28'),
('2020-01-28', '2020-01-29'),
('2020-01-29', '2020-01-30')
]
try:
#
con = sqlite3.connect("projects.sqlite")
#
cur = con.cursor()
#
cur.execute("""CREATE TABLE IF NOT EXISTS projects (
proj_id INTEGER PRIMARY KEY AUTOINCREMENT,
proj_start TEXT,
proj_end TEXT)""")
#
cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)
#
con.commit()
#
cur.close()
except sqlite3.Error as err:
print(" ", err)
finally:
#
con.close()
print(" ")
. DBeaver. , SQL .
select
p3.proj_group,
min(p3.proj_start) as date_start,
max(p3.proj_end) as date_end,
julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as delta
from
(select
p2.*,
sum(p2.flag)over(order by p2.proj_id) as proj_group
from
(select
p.proj_id ,
p.proj_start,
p.proj_end,
case
when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1
end as flag
from projects as p) as p2) as p3
group by p3.proj_group
, . . , : . , . , , lag. 0, 1. , . . , . . , ( julianday SQLite). . Spark.
, Apache Spark , Hadoop. Java, Scala R, Spark PySpark. . Google Colab, . - , . , .
Linux OpenJDK, Spark. . findspark. , .
SQLite , . , .
Spark , . , . -, , , -, . , “ Spark. ”, , , , .
, , SQL. : , ( datediff).
, . , - , , , SQL Spark. , , . .
from pyspark.sql.functions import lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Equivalent of Pandas.dataframe.shift() method
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))
#...
# Equivalent of SQL- CASE WHEN...THEN...ELSE... END
df_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))
#...
# Cumsum by column flag
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))
#...
# Equivalent of SQL - GROUP BY
from pyspark.sql.functions import min, max
df_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \
max("proj_end").alias("date_end"))
df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))
df_group.show()
.
, . . , “” , .
Même si vous n'avez jamais travaillé avec Spark auparavant, ce n'est pas une raison pour refuser le concours pour un poste vacant. Les bases de PySpark peuvent être maîtrisées en peu de temps, à condition que l'arrière-plan ait déjà une expérience de programmation à l'aide de la bibliothèque Pandas.
Les livres sur Spark ne manquent pas.
C'est tout. Toute la santé, bonne chance et réussite professionnelle!