In che modo AppsFlyer utilizza Apache Airflow per eseguire più di 3,5k compiti giornalieri

Alex Kruchkov
AWS
July 25, 2020

Secure Your Everything in the New Normal

AppsFlyer è essenzialmente una grande azienda di dati, riceviamo quotidianamente enormi quantità di dati dai nostri SDK, trasformiamo e normalizziamo questi dati e alla fine li presentiamo nella nostra dashboard con metriche e coorti diverse che sono rilevanti per i nostri utenti - piuttosto basilari, giusto?

Le cose diventano sempre più complesse quando si dà contesto al volume of the events, e nel nostro caso, stiamo parlando di oltre 90 miliardi di eventi al giorno e di circa 200 TB di dati giornalieri che vengono inseriti nel nostro sistema in AWS S3.

AppsFlyer Engineering is always hiring, just like our product’s always-on, 24/7 SLAs. >> Learn More

Per elaborare e calcolare tutti questi eventi, AppsFlyer mantiene circa 50 cluster Hadoop (vaniglia con un sistema di ridimensionamento automatico interno) che esegue oltre 3,5k + lavori Spark giornalieri che suddividono, tagliano e ruotano i dati per offrire ai nostri clienti i dati più precisi di cui hanno bisogno.

Una delle tecnologie chiave che consente e supporta questa scala di operazioni sui dati è Apache Airflow, che pianifica ed esegue tutti questi lavori, attraverso i vari cluster, pur essendo consapevole delle diverse caratteristiche di ciascun lavoro.

Che cos'è Apache Airflow in breve?

Dalla documentazione:

Airflow è una piattaforma per creare, pianificare e monitorare a livello di codice i flussi di lavoro.

Utilizzare Airflow per creare flussi di lavoro come DAG (Directed Acyclic Graphs) di attività. Lo scheduler Airflow esegue le attività su una serie di lavoratori mentre segue le dipendenze specificate. Le ricche utility della riga di comando semplificano l'esecuzione di interventi chirurgici complessi sui DAG. La ricca interfaccia utente semplifica la visualizzazione delle condutture in esecuzione in produzione, il monitoraggio dei progressi e la risoluzione dei problemi quando necessario.

Quando i flussi di lavoro sono definiti come codice, diventano più gestibili, verificabili, verificabili e collaborativi.

Pensa ad Airflow come a un server cron, ma con molte funzionalità come la progettazione di flussi di lavoro, che sono completamente personalizzabili con una grande interfaccia utente insieme alla registrazione centralizzata.

Architettura

Image for post
Image for post
Airflow Architetturaat AppsFlyer

Il flusso d'aria è cruciale per il core business di AppsFlyer in quanto è qui che viene eseguita la maggior parte delle attività ETL, così come molte altre. Con questo in mente, abbiamo dovuto adottare un primo approccio di stabilità e ci siamo assicurati che ogni parte di Airflow fosse altamente disponibile:

  • Per il DB di metadati, utilizziamo PostgreSQL che è gestito, sottoposto a backup e con una copia di backup in lettura tramite il servizio AWS RDS.
  • I server web di Airflow funzionano su più istanze, con a Consul load balancer davanti a loro per distribuire le richieste tra loro.
  • Lo scheduler è l'unico componente di Airflow che non è progettato per l'alta disponibilità, anche se siamo riusciti a aggirare il problema utilizzando Consul locks. Eseguiamo il processo di pianificazione avvolto con il comando lock. In questo modo, ci assicuriamo che sia attiva solo un'istanza dello scheduler e, se l'istanza fallisce, viene rilasciato il blocco che attiverà un altro server per riavviare il servizio.
  • Noi usiamo CeleryExecutor con un backend RabbitMQ come esecutore, in questo modo possiamo far girare più macchine worker, con uno scopo diverso (separato dalla coda). Inoltre, RabbitMQ è di per sé un cluster altamente disponibile.
  • Per essere sicuri che i lavoratori, lo scheduler e i server web siano in esecuzione con gli stessi file, abbiamo montato un NFS tra tutti i componenti. Questo NFS è l'obiettivo del lavoro di Jenkins che utilizziamo per distribuire eventuali modifiche (ne tratteremo più avanti).

Come accennato in precedenza, supportiamo circa 50 diversi cluster Hadoop e tutte le versioni Spark dall'1.6, per questo abbiamo creato il nostro SparkOperator che riceve i dati richiesti come parametro.

Image for post

Oltre all'operatore effettivo che esegue il lavoro Spark, abbiamo molti più operatori che vengono utilizzati per ridimensionare i nostri cluster, sia che si tratti di nodi spot EC2 o on-demand per essere in grado di eseguire l'attività effettiva.

Questo è il risultato dell'apprendimento che come sottoprodotto della gestione dei cluster Hadoop noi stessi, spesso scopriamo che uno dei cluster funziona male e deve reindirizzare tutti i lavori da quello malfunzionante a un cluster diverso. Per renderlo possibile, abbiamo creato un modo per sovrascrivere l'attuale parametro hadoop_cluster che l'utente inserisce in fase di esecuzione, questo viene fatto attraverso un dizionario che inseriamo come variabile Airflow che assomiglia a questo:

Image for post
Image for post
This dictionary notes that cluster “010” was changed to “110”, and every spark job that was supposed to run on “010” will run on “110” instead.

Operatore Docker

Suddividiamo le attività eseguite in due diversi carichi di lavoro:

  1. Attività che richiedono l'esecuzione di risorse esterne, ad esempio Spark in esecuzione in modalità cluster. Queste attività spark-submit sono in esecuzione con una quantità ridotta di risorse locali, poiché eseguiamo tutto in deploy-mode cluster.
  2. Attività che richiedono risorse locali, come elaborazione, memoria o archiviazione.

Questi due carichi di lavoro sono complessi quando vengono eseguiti su una singola istanza ed è per questo che abbiamo capito che dovevamo creare un isolamento completo. Fortunatamente, il fatto che usiamo CeleryExecutor introduce un ulteriore vantaggio. Abbiamo creato due set di lavoratori Airflow, in cui ognuno ascolta la propria coda. Un set di macchine è di tipo generale e il secondo ha più risorse di elaborazione.

Poiché AppsFlyer era già fortemente investito in servizi tecnici raggruppati all'interno di un container Docker, era un dato di fatto usare questo stack tecnologico già pronto in Airflow.

Una volta creato un repository in Jenkins, il codice viene automaticamente containerizzato e caricato in Artifactory. Da lì, utilizziamo un operatore Docker costruito internamente, che ci consente di ottenere ed eseguire questi container all'interno dei lavoratori Airflow. Un altro vantaggio che ne trarremo è il fatto che tutte le risorse e le dipendenze del codice richieste, come pacchetti, moduli e altre risorse, sono già impacchettate nell'immagine Docker e non è necessario installare nulla sui lavoratori loro stessi.

Docker Job operator, note that the queue parameter is different between AfDockerJobOperator and the SparkOperatorWithHook in the previous section

Variabili di Airflow

Le variabili Airflow sono un archivio di valori-chiave all'interno del database dei metadati di Airflow. AppsFlyer ama la flessibilità che offre un po 'troppo, lo usiamo per memorizzare parametri comuni per più lavori, specificando il numero di nodi in un cluster specifico (per gli operatori di ridimensionamento), riusabilità di vasetti comunemente usati ... Lo usiamo davvero ovunque . Ecco perché, non è una vera sorpresa che abbiamo avuto diversi problemi di produzione direttamente correlati ai cambiamenti delle variabili. Dove la cosa che ci manca, è il controllo, la convalida e la corretta CI adeguati per capire se la modifica della variabile rompe il DAG stesso.

Questa è stata un'altra opportunità per costruirlo da soli. Per eseguire questa attività, per prima cosa abbiamo aggiunto l'autenticazione LDAP al server web Airflow. In questo modo, ci ha permesso di separare gli utenti in base ai gruppi LDAP, abbiamo deciso che gli utenti regolari non avrebbero potuto visualizzare la pagina di amministrazione (filtro utente nella configurazione di Airflow) - ma ciò ha anche creato il sottoprodotto indesiderato dell'eliminazione della vista della pagina Variabili . Per rendere il flusso più intuitivo, abbiamo creato una nuova scheda nell'interfaccia utente e aggiunto una sezione di sola visualizzazione per le variabili.

Le modifiche alle variabili vengono apportate all'interno di un repository Git e viene invocata una pipeline di distribuzione che aggiorna il DB di metadati stesso con le ultime variabili. Il fatto che provenga da un repository Git ci consente di controllare, ripristinare e vedere facilmente le modifiche.

Flusso di lavoro di sviluppo

In AppsFlyer, miriamo a offrire allo sviluppatore un'esperienza più semplice e trasparente. Per fare ciò, il codice del nostro operatore si trova all'interno di una directory comune come la definizione stessa del DAG, per fornire la massima trasparenza del codice "infrastruttura".

Una volta che gli sviluppatori eseguono le modifiche sul proprio computer locale, possono creare un'immagine Docker Airflow che renderà le loro modifiche in tempo reale, per vedere che le loro dipendenze sono a posto e importa anche le variabili dal flusso d'aria di produzione, quindi può vedere correttamente i comandi renderizzati.

Dichiarazione di non responsabilità: nell'immagine Docker avviamo il server web solo perché non vogliamo che le cose vengano eseguite da un computer locale e potenzialmente abbiano un reale impatto sulla produzione.

Un'altra capacità che forniamo per verificare che tutto funzioni come dovrebbe, è eseguire test sul repository locale. Usando uno script, cariciamo l'intero repository locale in a DagBag oggetto. In questo modo, abbiamo tutti i frammenti analizzati come un albero delle dipendenze, proprio come li vede lo scheduler. Durante l'importazione vengono rilevati errori e il test fallirà.

 
  
#!/usr/bin/env python

from airflow.models import DagBag
import sys,os

if len(sys.argv) == 1:
    print "Dag folder needs to be provided as a parameter"
    sys.exit(2)

print "==== Testing DAG loading ===="

db = DagBag(sys.argv[1])

if len(db.import_errors) > 0:
    print "There have been import errors, the following dag files are broken:"
    print db.import_errors
    sys.exit(1)

for dag in sorted(db.dags):
    print "========== Show structure for DAG {}".format(dag)
    db.dags[dag].tree_view()

print "===== DAG Loading done ===="

 

Dopo aver superato con successo i test locali, gli sviluppatori eseguono un lavoro Jenkins, che essenzialmente esegue gli stessi test, come un meccanismo di sicurezza. Dopo che la build è stata eseguita correttamente, quando necessario, vengono aggiornate anche le variabili nella dashboard di Airflow. Gli stessi DAG vengono caricati su un bucket S3 e quindi trasferiti sul server NFS in modo che i vari componenti Airflow visualizzino contemporaneamente il codice aggiornato.

Per le principali modifiche all'infrastruttura, abbiamo una configurazione separata e più completa che include un cluster di flusso d'aria di prova. Ciò ci consente di testare qualsiasi modifica sostanziale dell'infrastruttura o una modifica sostanziale apportata a uno degli operatori distribuendola da una filiale operativa al cluster Airflow di prova, prima di implementare l'operazione Airflow di produzione.

Avviso e monitoraggio

La definizione di avvisi e monitoraggio adeguati è una delle cose più importanti con qualsiasi componente AppsFlyer. Oltre a monitorare le metriche di base dell'host di CPU, memoria, utilizzo del disco, ... Abbiamo anche creato dashboard basati sul DB di metadati di Airflow utilizzando l'origine dati Grafana PostgreSQL per avere analisi sull'esito positivo / negativo dei lavori.

Per quanto riguarda gli Alert, abbiamo creato alcuni hack che ci semplificano la vita:

  • Avviso di mixin: ogni operatore che creiamo eredita dall'avviso di mixin che abbiamo creato. Questo è il modo in cui definiamo gli avvisi sui lavori non riusciti. Ogni lavoro è configurato con una politica di avviso predefinita e quando un lavoro fallisce, attiviamo questa politica di avviso per avvisare il proprietario del lavoro, che recuperiamo dal campo Proprietario nel DAG / lavoro livello.

Our predefined alerting policies
This is how it looks on the job level
  • Airflow SLA Alerts-
    Sebbene Airflow abbia un proprio SLA per impostazione predefinita, non soddisfa sufficientemente le nostre esigenze. Mentre Airflow misura il suo SLA con il tempo e può eseguire un callback personalizzato sulla violazione, su AppsFlyer, vogliamo eseguire controlli SLA su parametri aggiuntivi, ad esempio se il DAG non è stato nemmeno eseguito o se più lavori (che vengono eseguiti ogni ora) fallito. Per fare ciò, abbiamo scritto il nostro sistema che controlla il DB dei metadati e verifica se sono state violate le regole SLA predefinite. Questo flusso copre anche lo scenario in cui Airflow è completamente inattivo e non ne eravamo a conoscenza.
Types of SLA checks we can define per each task

In conclusione, AppsFlyer ama davvero Airflow. La sua semplicità ci permette di essere versatili come vogliamo. Il ridimensionamento è semplice e i costi di manutenzione sono davvero bassi.

Abbiamo ancora molta strada da fare con la corretta gestione delle autorizzazioni, il miglioramento del flusso CI / CD e alcune altre funzionalità a cui stiamo pensando, ma Airflow ci semplifica la vita e ti incoraggio a dare un'occhiata se stai cercando una soluzione solida per orchestrare i flussi di lavoro di pianificazione delle tue operazioni su larga scala.

Sentiti ringraziamenti a Elad Leev, Sharone Zitzman, Barak Gitsis, and Moshe Derri. 

Alex Kruchkov

Technology geek who is constantly eager in learning more new things. Trying to make cool stuff for AppsFlyer's Data Platform group for the past few years.

Related Posts

Newsletter ItalyClouds.com

Thank you! Your submission has been received!

Oops! Something went wrong while submitting the form