visit
Enter Airflow. Your new workflow management platform.
A couple of years ago, In Scaling Effectively: when Kubernetes met Celery, I wrote about my own implementation of a workflow engine using Flask, Celery, and Kubernetes. I considered available solutions — including airflow. With no satisfying solution in sight, I decided to implement my own framework. Since then, airflow had come a long way. Here is why I switched to Airflow:
Scalable
When using the right setup, the one we are about to see, Airflow is both scalable and cost-efficient.Batteries Included
Though the UI is not perfect, it is one of Airflow’s core competencies. And in this case, a picture is worth a thousand words-Battle Tested
With so many companies using Airflow, I can rest assured knowing it is going to continuously improve.🔥 Disposable Infrastructure
Using helm and some premade commands, we can destroy and re-deploy the entire infrastructure easily.🚀 Cost-Efficient Execution
We use kubernetes as the tasks’ engine. Airflow scheduler will run each task on a new pod and delete it upon completion. Allowing us to scale according to workload using the minimal amount of resources.🔩 Decoupled Orchestration
Another great advantage of using Kubernetes as the task runner is — decoupling orchestration from execution. You can read more about it in .🏃 Dynamically Updated Workflows
We use Git-Sync containers. Those will allow us to update the workflows using git alone. No need to redeploy Airflow on each workflow change.CeleryExecutor + KubernetesPodOperator (recommended)
➕ Decoupling of orchestration and execution.
➖ Extra pods for celery workers redis and flower monitoring.
KubernetesExecutor + WhateverOperator
➕ No extra pods.
➖ Weak-Decoupling. we’ll have to define execution code and dependencies inside the DAGs.
KubernetesExecutor + KubernetesPodOperator
➕ No extra pods.
➕ Decoupling of orchestration and execution.
➖ Unsupported — currently causes recursion of pod startup.
Prerequisites
> brew install kubectl
> brew install helm
Setup
cookiecutter //github.com/talperetz/scalable-airflow-template
make deploy
An Example Workflow
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
example_workflow = DAG('kube-operator',
default_args=default_args,
schedule_interval=timedelta(days=1))
with example_workflow:
t1 = KubernetesPodOperator(namespace='airflow',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "hello world"],
labels={'runner': 'airflow'},
name="pod1",
task_id='pod1',
is_delete_operator_pod=True,
hostnetwork=False,
)
t2 = KubernetesPodOperator(namespace='airflow',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "hello world"],
labels={'runner': 'airflow'},
name="pod2",
task_id='pod2',
is_delete_operator_pod=True,
hostnetwork=False,
)
t3 = KubernetesPodOperator(namespace='airflow',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "hello world"],
labels={'runner': 'airflow'},
name="pod3",
task_id='pod3',
is_delete_operator_pod=True,
hostnetwork=False,
)
t4 = KubernetesPodOperator(namespace='airflow',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "hello world"],
labels={'runner': 'airflow'},
name="pod4",
task_id='pod4',
is_delete_operator_pod=True,
hostnetwork=False,
)
t1 >> [t2, t3] >> t4
If you enjoyed this post, feel free to share it 📤
and if you’re interested in posts to come, make sure to follow me on