visit
In this post, we set up Apache Airflow as part of the preparation to convert our ETL code into a full-fledged data pipeline.
This is the third post in my series, Towards Open Options Chains: A Data Pipeline for Collecting Options Data at Scale:
Installing Airflow is straightforward. We use the for installing from the Python Package Index (PyPI). Specifically, we use the installation code from the section Installing Airflow with extras and providers with only the postgres
dependency. To keep our environment clean, we install Airflow in a dedicated environment. I use Conda, but you can use any dependency manager to achieve the same thing.
AIRFLOW_VERSION=2.2.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="//raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# Create and activate new Conda environment
conda create -n airflow
conda activate airflow
pip install "apache-airflow[postgres]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
The airflow
folder should now be created in /root/
. We will now change some configurations:
LocalExecutor
, which can run tasks as sub-processes in parallel. The default SequentialExecutor
can only run tasks one at a time.
Look for the variables in /root/airflow/airflow.cfg
and change them accordingly:
# sql_alchemy_conn = sqlite:////root/airflow/airflow.db - Comment this out
sql_alchemy_conn = postgres+psycopg2://postgres:<your postgres password>@localhost/postgres
# load_examples = True - Comment this out
load_examples = False
# Check connection to Postgres
airflow db check
# Initialise database with Postgres
airflow db init
# Create new admin user
airflow users create -u <username here; consider using "admin"> -p <password here> -f <first name> -l <last name> -r Admin -e <your email, or a bogus one>
airflow scheduler
airflow webserver
In a browser window, navigate to the URL provided in the terminal. This should be similar to //localhost:8080
. Log in with the username and password you chose previously, and you'll be directed to the Airflow UI:
Airflow allows you to incorporate settings via its UI. We will need to configure some simple variables and connections.
You can probably already guess that Airflow variables behave like environment variables. Thus far, we have set two: (1) API key to the TD Ameritrade (TDA) API, and (2) your database password. We will create an Airflow variable for each of them.
In the UI, navigate to Admin on the top menu bar, and select Variables in the dropdown menu. Click the add icon (+) to add a new variable. I've used the keys API_KEY
and DB_PASSWORD
, respectively.
from airflow.models import Variable
# Get variables
API_KEY = Variable.get('API_KEY')
DB_PASSWORD = Variable.get('DB_PASSWORD')
Earlier on, we used psycopg2
to connect Python to our Postgres database. However, since we're using Airflow to run the ETL job, we can leverage Airflow's . Hooks simplify the code needed to interact with other services (e.g., databases). See below for a simple comparison for our use case:
# The psycopg2 way
import psycopg2 as pg2
from airflow.models import Variable
DB_PASSWORD = Variable.get('DB_PASSWORD')
conn = pg2.connect(host='localhost', database='optionsdata',
user='postgres', password=DB_PASSWORD)
query = '...'
with conn.cursor() as cursor:
cursor.execute(query)
df = pd.DataFrame(cursor.fetchall(), columns=...)
conn.close()
# Using Airflow Hooks
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook(postgres_conn_id='postgres_optionsdata')
query = '...'
df = pg_hook.get_pandas_df(query)
In addition, every time we need to make a connection to the optionsdata
database, we have to repeat the code above. First, using the Hook method achieves the same thing in fewer lines of code. Second, if we don't have the host, database, user, or password on hand, we need not worry - we can just use the connection ID.
To create the connection, navigate to Admin on the top menu bar, and select Connections in the dropdown menu. Click the add icon (+) to add a new connection. Fill in the details accordingly, selecting Postgres as the connection type, and entering the database name in the "schema" field.