in

A complete Apache Airflow tutorial: building data pipelines with Python

Apache Airflow has develop into the de facto library for pipeline orchestration within the Python ecosystem. It has gained recognition, contary to related options, as a consequence of its simplicity and extensibility. On this article, I’ll try to stipulate its principal ideas and provide you with a transparent understanding of when and how you can use it.

Why and when ought to I take into account Airflow?

Think about that you just wish to construct a machine studying pipeline that consists of a number of steps comparable to:

  1. Learn a picture dataset from a cloud-based storage

  2. Course of the pictures

  3. Prepare a deep studying mannequin with the downloaded photographs

  4. Add the skilled mannequin within the cloud

  5. Deploy the mannequin

How would you schedule and automate this workflow? Cron jobs are a easy resolution however they arrive with many issues. Most significantly, they gained’t will let you scale successfully. Then again, Airflow provides the power to schedule and scale advanced pipelines simply. It additionally lets you robotically re-run them after failure, handle their dependencies and monitor them utilizing logs and dashboards.

Earlier than we construct the aforementioned pipeline, let’s perceive the fundamental ideas of Apache Airflow.

What’s Airflow?

Apache Airflow is a device for authoring, scheduling, and monitoring pipelines. Because of this, is a perfect resolution for ETL and MLOps use circumstances. Instance use circumstances embody:

  • Extracting information from many sources, aggregating them, remodeling them, and retailer in an information warehouse.

  • Extract insights from information and show them in an analytics dashboard

  • Prepare, validate, and deploy machine studying fashions

Key parts

When putting in Airflow in its default version, you will note 4 completely different parts.

  1. Webserver: Webserver is Airflow’s person interface (UI), which lets you work together with it with out the necessity for a CLI or an API. From there one can execute, and monitor pipelines, create connections with exterior techniques, examine their datasets, and lots of extra.

  2. Executor: Executors are the mechanism by which pipelines run. There are lots of differing kinds that run pipelines domestically, in a single machine, or in a distributed vogue. A couple of examples are LocalExecutor, SequentialExecutor, CeleryExecutor and KubernetesExecutor

  3. Scheduler: The scheduler is answerable for executing completely different duties on the appropriate time, re-running pipelines, backfilling information, guaranteeing duties completion, and many others.

  4. PostgreSQL: A database the place all pipeline metadata is saved. That is usually a Postgres however different SQL databases are supported too.



The best method to set up Airflow is utilizing docker compose. You’ll be able to obtain the official docker compose file from right here:

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

Word that Airflow additionally resides on Pypi and will be downloaded utilizing pip

Primary ideas of Airflow

As a way to get began with Airflow, one must be acquainted with its principal ideas, which is usually a little difficult. So let’s attempt to demystify them.

DAGs

All pipelines are outlined as directed acyclic graphs (DAGs). Any time we execute a DAG, a person run is created. Every DAG run is separate from one other and comprises a standing relating to the execution stage of the DAG. Because of this the identical DAGs will be executed many occasions in parallel.


dag-example

To instantiate a DAG, you should use the DAG operate or with a context supervisor as follows:

from airflow import DAG

with DAG(

"mlops",

default_args={

"retries": 1,

},

schedule=timedelta(days=1),

start_date=datetime(2023, 1, 1)

) as dag:

The context supervisor accepts some world variables relating to the DAG and a few default arguments. The default arguments are handed into all duties and will be overridden on a per-task foundation. The whole listing of parameters will be discovered on the official docs.

On this instance, we outline that the DAG will begin on 1/1/2023 and will likely be executed every day. The retries argument ensures that will probably be re-run as soon as after a doable failure.

Duties

Every node of the DAG represents a Job, that means a person piece of code. Every activity might have some upstream and downstream dependencies. These dependencies specific how duties are associated to one another and by which order they need to be executed. At any time when a brand new DAG run is initialized, all duties are initialized as Job cases. Because of this every Job occasion is a selected run for the given activity.


complex-dag

Operators

Operators will be considered as templates for predefined duties as a result of they encapsulate boilerplate code and summary a lot of their logic. Some frequent operators are BashOperator, PythonOperator, MySqlOperator, S3FileTransformOperator. As you possibly can inform, the operators enable you to outline duties that observe a selected sample. For instance, the MySqlOperator creates a activity to execute a SQL question and the BashOperator executes a bash script.

Operators are outlined contained in the DAG context supervisor as under. The next code creates two duties, one to execute a bash command and one to execute a MySQL question.

with DAG(

"tutorial"

) as dag:

task1 = BashOperator(

task_id="print_date",

bash_command="date",

)

task2 = MySqlOperator(

task_id="load_table",

sql="/scripts/load_table.sql"

)

Job dependencies

To type the DAG’s construction, we have to outline dependencies between every activity. A method is to make use of the >> image as proven under:

task1 >> task2 >> task3

Word that one activity might have a number of dependencies:

task1 >> [task2, task3]

The opposite approach is thru the set_downstream, set_upstream features:

t1.set_downstream([t2, t3])

XComs

XComs, or cross communications, are answerable for communication between duties. XComs objects can push or pull information between duties. Extra particularly, they push information into the metadata database the place different duties can pull from. That’s why there’s a restrict to the quantity of information that may be handed by means of them. Nevertheless, if one must switch massive information, they will use appropriate exterior information storages comparable to object storage or NoSQL databases.


xcoms

Check out the next code. The 2 duties are speaking through xcoms utilizing the ti argument (brief for activity occasion). The train_model activity is pushing the model_path into the metadata database, which is pulled by the deploy_model activity.

dag = DAG(

'mlops_dag',

)

def train_model(ti):

model_path = train_and_save_model()

ti.xcom_push(key='model_path', worth=model_path)

def deploy_model(ti):

model_path = ti.xcom_pull(key='model_path', task_ids='train_model')

deploy_trained_model(model_path)

train_model_task = PythonOperator(

task_id='train_model',

python_callable=train_model,

dag=dag

)

deploy_model_task = PythonOperator(

task_id='deploy_model',

python_callable=deploy_model,

dag=dag

)

train_model_task >> deploy_model_task

Taskflow

The Taskflow API is a straightforward method to outline a activity utilizing the Python decorator @activity. If all the duty’s logic will be written with Python, then a easy annotation can outline a brand new activity. Taskflow robotically manages dependencies and communications between different duties.

Utilizing the Taskflow API, we are able to initialize a DAG with the @dag decorator. Right here is an instance:

@dag(

start_date=datetime(2023, 1, 1),

schedule_interval='@each day'

)

def mlops():

@activity

def load_data():

. . .

return df

@activity

def preprocessing(information):

. . .

return information

@activity

def match(information):

return None

df = load_data()

information = preprocessing(df)

mannequin = match(information)

dag = mlops()

Word that dependencies between duties are implied by means of every operate arguments. Right here we’ve a easy chaining order however issues can get rather more advanced. Taskflow API additionally solves the communication drawback between duties, so there’s a restricted want to make use of XComs.

Scheduling

Scheduling jobs is without doubt one of the core options of Airflow. This may be finished utilizing the schedule_interval argument which receives a cron expression, a datetime.timedelta object, or a predefined preset comparable to @hourly, @each day and many others. A extra versatile method is to make use of the lately added timetables that allow you to outline customized schedules utilizing Python.

Right here is an instance of how you can use the schedule_interval argument. The under DAG will likely be executed each day.

@dag(

start_date=datetime(2023,1,1),

schedule_interval = '@each day',

catchup =False

)

def my_dag():

move

Two crucial ideas you should perceive relating to scheduling are backfilling and catchup.

As soon as we outline a DAG, we arrange a begin date and a schedule interval. If catchup=True, Airflow will create DAG runs for all schedule intervals from the beginning date till the present date. If catchup=False, Airflow will schedule solely runs from the present date.

Backfilling extends this concept by enabling us to create previous runs from the CLI regardless of the worth of the catchup parameter:

$ airflow backfill -s <START_DATE> -e <END_DATE> <DAG_NAME>


dag-runs

Connections and Hooks

Airflow supplies a straightforward method to configure connections with exterior techniques or providers. Connections will be created utilizing the UI, as setting variables, or by means of a config file. They often require a URL, authentication data and a singular id. Hooks are an API that abstracts communication with these exterior techniques. For instance, we are able to outline a PostgreSQL connection by means of the UI as follows:


connections

After which use the PostgresHook to ascertain the connection and execute our queries:

pg_hook = PostgresHook(postgres_conn_id='postgres_custom')

conn = pg_hook.get_conn()

cursor = conn.cursor()

cursor.execute('create desk _mytable (ModelID int, ModelName varchar(255)')

cursor.shut()

conn.shut()

Superior ideas

To maintain this tutorial as self-complete as doable, I would like to say just a few extra superior ideas. I gained’t go into many particulars for every considered one of them however I extremely urge you to test them out, if you wish to grasp Airflow.

  • Branching: Branching lets you divide a activity into many alternative duties both for conditioning your workflow or for splitting the processing. The commonest approach is BranchPythonOperator.

  • Job Teams: Job Teams enable you to arrange your duties in a single unit. It’s an ideal device to simplify your graph view and for repeating patterns.

  • Dynamic Dags: Dags and duties will also be constructed in a dynamic approach. Since Airflow 2.3, dags and duties will be created at runtime which is right for parallel and input-dependent duties. Jinga templates are additionally supported by Airflow and are a really useful addition to dynamic dags.

  • Unit checks and logging: Airflow has devoted performance for working unit checks and logging info

Airflow greatest practices

Earlier than we see a hands-on instance, let’s talk about just a few greatest practices that almost all practitioners use.

  1. Idempotency: DAGs and duties must be idempotent. Reexecuting the identical DAG run with the identical inputs ought to at all times have the identical impact as executing it as soon as.

  2. Atomicity: Duties must be atomic. Every activity must be answerable for a single operation and impartial from the opposite duties

  3. Incremental filtering: Every DAG run ought to course of solely a batch of information supporting incremental extracting and loading. That approach doable failures gained’t have an effect on all the dataset.

  4. Prime-level code: Prime-level code must be averted if it’s not for creating operators or dags as a result of it’s going to have an effect on efficiency and loading time. All code must be inside duties, together with imports, database entry, and heavy computations

  5. Complexity: DAGs must be stored so simple as doable as a result of excessive complexity might impression efficiency or scheduling

Instance of an Airflow pipeline

To exhibit all of the aforementioned ideas, let’s return to the instance workflow talked about at the start of this text. We’ll develop a pipeline that trains a mannequin and deploy it in Kubernetes. Extra particularly, the DAG will include 5 duties:

  1. Learn photographs from an AWS s3 bucket

  2. Preprocess the pictures utilizing Pytorch

  3. Nice-tune a ResNet mannequin with the downloaded photographs

  4. Add the mannequin in S3

  5. Deploy the mannequin in a Kubernetes Cluster

Word that I cannot embody all the precise particulars and the mandatory code, solely the elements which might be associated to Airflow.

First, let’s begin by defining the DAG. As you possibly can see, the pipeline will execute as soon as a day. In case of failure, there will likely be a single retry after one hour. Furthermore, there will likely be no catchup even supposing the pipeline is meant to begin two days in the past.

from airflow import DAG

import datetime

default_args = {

'proprietor': 'airflow',

'depends_on_past': False,

'start_date': airflow.utils.dates.days_ago(2),

'retries': 1,

'retry_delay': datetime. timedelta(hours=1),

}

dag = DAG(

'resnet_model',

default_args=default_args,

description='A easy DAG to exhibit Airflow with PyTorch and Kubernetes',

schedule_interval='@each day',

catchup=False

)

The primary activity is answerable for studying the pictures from AWS S3. To perform that, we are able to use the S3Hook. We first outline the studying performance in a operate after which the corresponding PythonOperator. Word that right here I take advantage of the default AWS connection, however generally, you will have to outline your individual.

from airflow.suppliers.amazon.aws.hooks.s3 import S3Hook

def read_images_from_s3(**kwargs):

s3_conn = S3Hook(aws_conn_id='aws_default')

photographs = []

for obj in s3_conn.get_bucket('mybucket').objects.all():

photographs.append(obj.key)

kwargs['ti'].xcom_push(key='photographs', worth=photographs)

read_images = PythonOperator(

task_id='read_images',

python_callable=read_images_from_s3,

provide_context=True,

dag=dag

)

Subsequent in line are the remodel and match features. I gained’t embody them right here of their entirety as a result of they’re principally commonplace Pytorch code.

def preprocess_images(photographs, **kwargs):

photographs = kwargs['ti'].xcom_pull(task_ids='read_images_from_s3', key='photographs')

kwargs['ti'].xcom_push(key='photographs', worth=train_images)

def fit_model(preprocessed_images, **kwargs):

train_ photographs = kwargs['ti'].xcom_pull(task_ids=preprocess_images, key='train_images')

torch.save(mannequin, 'trained_model.pt')

preprocess = PythonOperator(

task_id='preprocess',

python_callable=preprocess,

provide_context=True,

dag=dag

)

fit_model = PythonOperator(

task_id='fit_model',

python_callable=fit_model,

provide_context=True,

dag=dag

)

As soon as the mannequin is skilled, we have to add it in S3 so we are able to load it and serve requests. This may be finished utilizing the S3FileTransferOperator, which reads from the native file system and add it to S3.

from airflow.operators.s3_file_transform_operator import S3FileTransformOperator

upload_model = S3FileTransferOperator(

task_id='upload_model',

source_base_path='.',

source_key='trained_model.pt',

dest_s3_bucket='my-model-bucket',

dest_s3_key='trained_model.pt',

dag=dag

)

The ultimate step is to create a Kubernetes pod and serve the mannequin. One of the best ways to realize that’s through the use of the KubernetedPodExecutor. Assuming that we’ve a deployment script that handles the mannequin loading and serving (which is not going to analyze right here), we are able to do one thing as follows:

from airflow.suppliers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

deploy_model = KubernetesPodOperator(

namespace='default',

picture='myimage:newest',

title='deploy-model',

task_id='deploy_model',

cmds=['python', 'deploy.py'],

arguments=[model],

is_delete_operator_pod=True,

hostnetwork=False,

dag=dag

)

The KubernetesPodOperator makes use of the Kubernetes API to launch a pod in a Kubernetes cluster and execute the deployment script.

As soon as we’ve outlined all duties, we merely must create their dependencies and type the DAG. This is so simple as:

read_images >> preprocess >> fit_model >> upload_model >> deploy_model

And that’s all. This DAG will likely be initialized by Airflow and will be monitored by means of the UI. The scheduler will likely be answerable for executing the duties within the appropriate order and on the correct time.

Conclusion

Apache Airflow is a good information engineering device in my trustworthy opinion. Certain, it has some shortcomings however will also be very versatile and scalable. If you wish to dive deeper, I’ve two sources to counsel:

  1. A course by IBM on Coursera: ETL and Information Pipelines with Shell, Airflow and Kafka. By the best way, all the certification on information engineering by IBM is fairly nice.

  2. Information Engineering with AWS Nanodegree from AWS in Udacity. The 4th module specifically focuses closely on Airflow.

Tell us should you’d prefer to see extra tutorials on standard information engineering libraries. In the event you’re new to AI Summer time, don’t overlook to observe us on Twitter or Linkedin, to remain up to date with our newest articles.

Deep Studying in Manufacturing E-book 📖

Discover ways to construct, practice, deploy, scale and keep deep studying fashions. Perceive ML infrastructure and MLOps utilizing hands-on examples.

Be taught extra

* Disclosure: Please word that a number of the hyperlinks above is likely to be affiliate hyperlinks, and at no further price to you, we are going to earn a fee should you determine to make a purchase order after clicking by means of.

Share:

Leave a Reply

Your email address will not be published. Required fields are marked *