visit
In this post, we will build on our work in Part II: Foundational ETL Code and Part III: Getting Started with Airflow by converting our ETL pipeline into a Directed Acyclic Graph (DAG), which comprises the tasks and dependencies for the pipeline on Airflow.
This is the fourth post in my series, Towards Open Options Chains: A Data Pipeline for Collecting Options Data at Scale:
First, we already have our template: the ETL job we’ve written so far! That is, (1) query the TD Ameritrade (TDA) API, (2) process it into the required format, and (3) load it into Postgres. We separate the ETL steps into different tasks in the DAG, because dumping all our code into one function may be problematic. Suppose that step 3 fails in the pipeline. By then, we’ve already retrieved data from the API and processed it, but treating all three steps as one task, Airflow would re-run the entire thing. That’s a waste of resources and time.
Second, we write in the flexibility to collect data on new tickers. Consider this scenario: our DAG is currently configured to collect data on FB, and we would like to now switch over to gold (GDX). Our scripts have hardcoded the FB
ticker in the extract and load steps, and we don’t have a GDX
table yet. To resolve this, we use a TICKER
variable in all our steps, and add a new task at the very beginning of the pipeline to create a table for the ticker if it does not already exist. When we change the TICKER
we want to collect data on, our DAG will run without throwing any errors resulting from the relation (table) not existing in Postgres.
In this first block, we import the necessary libraries to make our code work. The first two imports pertain to time. datetime
is used for us to specify dates and durations, and pendulum
is for us to define timezones, which are essential for scheduling the workflows at the correct time.
# Imports
import pendulum
from datetime import datetime, timedelta
from airflow.models import DAG, Variable
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
DAG
: For defining a no-code AI chatbot that automatically downloads options data, asks how much money you want to make, trades on your brokerage to make that amount for you, and keeps 5% commissions so it can expand and take over the world. Yea, no. It defines a DAG.Variable
: If you recall from Part III: Getting Started with Airflow, we created “environment variables” in Airflow. This function allows you to access them.PythonOperator
: Operators are the basic building blocks of Airflow DAGs. They contain the logic for a single task. The PythonOperator is an Operator that runs Python code. There are many others like BashOperator for running bash scripts, S3FileTransformOperator for working with AWS S3, and even a PostgresOperator for interacting with a PostgreSQL database.PostgresHook
: As explained in Part III, Hooks simplify the code needed to interact with other services. Under the hood, Operators use Hooks for interactivity.
You may have noticed that most of the imports are for Airflow functions. But where are the others? Where’s numpy
? Where’s pandas
?
First, we create the default_args
dictionary, which we will pass to the DAG definition. There are many more settings, but the ones I wanted to set explicitly were as shown. Pay close attention to the start_date
. We use datetime
to set the date, and explicitly specify a timezone with assistance from pendulum
. This is crucial for us because the market operates on US Eastern time i.e. where Wall Street is. This refers to Eastern Standard Time (EST) in autumn/winter (UTC-05:00) and Eastern Daylight Time (EDT) in spring/summer (UTC-04:00). By specifying America/New_York
in our start_date
, we make this DAG timezone-aware. There are also benefits downstream when we specify the DAG’s schedule interval.
# Set arguments
us_east_tz = pendulum.timezone('America/New_York')
default_args = {
'owner': 'chrischow',
'start_date': datetime(2022, 1, 7, 7, 30, tzinfo=us_east_tz),
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
# Set variables
TICKER = 'FB'
# Get variables
API_KEY = Variable.get('API_KEY')
DB_PASSWORD = Variable.get('DB_PASSWORD')
Next, we set the ticker that we want to collect data on. We will set only one ticker for now. We’ll change this later on when we convert the DAG into a dynamic one.
Finally, we retrieve the variables that we set via the Airflow UI.Next, we define the DAG. As you can see, we load in the default_args
from before, and add a description and tags.
We also specify the schedule_interval
using a cron expression (use to experiment). What the expression means is:
Note that we specify 8am - 9pm. We can get away with this because we set the correct timezone earlier on. This is a big gotcha. If we hadn’t set the timezone and instead used Singapore time, we would need two DAGs: one to run from 8pm to midnight (AM in New York) and one to run from midnight to 9am (PM in New York). This is for autumn/winter. We would then have to manually change the time again once spring arrives.
dag = DAG(
dag_id=f'get_options_data_{TICKER}',
default_args=default_args,
description=f'ETL for {TICKER} options data',
schedule_interval='*/30 8-21 * * 1-5',
catchup=False,
tags=['finance', 'options', TICKER]
)
The last setting is catchup
. This becomes a problem when we set a start date (from default_args
) in the past, but trigger the DAG now. If we require the DAG to catchup
, Airflow will trigger the DAG for intervals that it has not been run for since the last execution date. For example, if the start date was 1 Jan 2021, the last execution date was 1 Jan 2021, and we trigger the dag one year later on 1 Jan 2022, the Airflow scheduler would create and execute DAG runs to make up for the whole damn year of 2021, until 1 Jan 2022. For safety, we turn catchup off. We don’t need it anyway.
From Part II, we wrote most of the code we need to define all the tasks in our data pipeline. But, we still have to figure out how to transfer data between tasks. Inside a Python runtime, all outputs from any function can be accessed by any other function. Inside an Airflow DAG, each Operator saves no outputs. Creation of temporary files are handled inside the Python callable. Therefore, unlike the Python runtime, we need to explicitly save data in a staging area, or find a way to pass data around.
Fortunately, Airflow has or cross-communications that enable Tasks to communicate with one another [3]. XComs comprise a key to identify themselves, the task ID and DAG ID it came from, and serialisable values. In a way, XComs abstract away the staging process for you. Thanks to Postgres, the maximum size for an XCom is 1GB (another reason why I chose it). In contrast, the limit for MySQL is a measly 64kb. Using SQLite would have given us 2GB, but as a database engine, it does not have the robust features that Postgres does.
You might be wondering why we didn’t simply use a temporary directory as a staging area. The reason is that if this solution is upgraded into a distributed one, we can’t be confident that all tasks in the same DAG run will be completed on the same machine. If one task is on machine A, and a task that depends on the first task is on machine B, it will not have access to the temporary folder created on machine A. Besides, if you’re running this solution locally, it won’t hurt to use XComs.
The first task is to create a table based on a ticker, if the table does not already exist. Only two lines of code are required: (1) connect to Postgres via the PostgresHook, and (2) create the table using the same SQL statement as in Part II.
# Function to create table
def create_table(ticker):
# Define Postgres hook
pg_hook = PostgresHook(postgres_conn_id='postgres_optionsdata')
# Create table if it doesn't exist
pg_hook.run(f"""
CREATE TABLE IF NOT EXISTS {ticker} (
put_call VARCHAR(5) NOT NULL,
symbol VARCHAR(32) NOT NULL,
description VARCHAR(64) NOT NULL,
bid DOUBLE PRECISION,
ask DOUBLE PRECISION,
last DOUBLE PRECISION,
bid_size INTEGER,
ask_size INTEGER,
last_size INTEGER,
high_price DOUBLE PRECISION,
low_price DOUBLE PRECISION,
open_price DOUBLE PRECISION,
close_price DOUBLE PRECISION,
total_volume INTEGER,
quote_time BIGINT,
volatility DOUBLE PRECISION,
delta DOUBLE PRECISION,
gamma DOUBLE PRECISION,
theta DOUBLE PRECISION,
vega DOUBLE PRECISION,
rho DOUBLE PRECISION,
open_interest INTEGER,
time_value DOUBLE PRECISION,
theoretical_value DOUBLE PRECISION,
strike_price DOUBLE PRECISION,
expiration_date BIGINT,
dte INTEGER,
PRIMARY KEY (symbol, quote_time)
)
""")
This is where XComs come into play. To use XComs, we need to add the argument ti
or task_instance
. Airflow passes this task instance object into the callables, thereby giving us access to XComs through it. As you can see in the final line of the code, we use the ti
object to push the raw data in Python dictionary format (because it’s serialisable) as an XCom.
# Function to get data from TDA API
def extract_options_data_from_tda(ticker, ti):
# Import modules
import json
import requests
# Configure dates
start_date = datetime.utcnow().replace(tzinfo=us_east_tz)
end_date = start_date + timedelta(days=45)
# Configure request
headers = {'Authorization': ''}
params = (
('apikey', API_KEY),
('symbol', ticker),
('contractType', 'PUT'),
('strikeCount', '50'),
('range', 'ALL'),
('fromDate', start_date),
('toDate', end_date),
)
# Get data
response = requests.get(
'//api.tdameritrade.com/v1/marketdata/chains',
headers=headers,
params=params
)
data = json.loads(response.content)
# Push XCOM
ti.xcom_push(key='raw_data', value=data)
The code below should look familiar. In this function, we have only one argument: ti
. Near the top of the function definition, we use it to pull the data from the previous task that was stored as an XCom.
# Function to transform data
def transform_options_data(ti):
# Import modules
import pandas as pd
# Pull XCOM
data = ti.xcom_pull(key='raw_data', task_ids=['extract_options_data_from_tda'])[0]
# Define columns
columns = ['putCall', 'symbol', 'description', 'exchangeName', 'bid', 'ask',
'last', 'mark', 'bidSize', 'askSize', 'bidAskSize', 'lastSize',
'highPrice', 'lowPrice', 'openPrice', 'closePrice', 'totalVolume',
'tradeDate', 'tradeTimeInLong', 'quoteTimeInLong', 'netChange',
'volatility', 'delta', 'gamma', 'theta', 'vega', 'rho', 'openInterest',
'timeValue', 'theoreticalOptionValue', 'theoreticalVolatility',
'optionDeliverablesList', 'strikePrice', 'expirationDate',
'daysToExpiration', 'expirationType', 'lastTradingDay', 'multiplier',
'settlementType', 'deliverableNote', 'isIndexOption', 'percentChange',
'markChange', 'markPercentChange', 'mini', 'inTheMoney', 'nonStandard']
# Extract puts data
puts = []
dates = list(data['putExpDateMap'].keys())
for date in dates:
strikes = data['putExpDateMap'][date]
for strike in strikes:
puts += data['putExpDateMap'][date][strike]
# Convert to dataframe
puts = pd.DataFrame(puts, columns=columns)
# Select columns
puts = puts[['putCall', 'symbol', 'description', 'bid', 'ask', 'last', 'bidSize',
'askSize', 'lastSize', 'highPrice', 'lowPrice', 'openPrice',
'closePrice', 'totalVolume', 'quoteTimeInLong', 'volatility', 'delta',
'gamma', 'theta', 'vega', 'rho', 'openInterest', 'timeValue',
'theoreticalOptionValue', 'strikePrice', 'expirationDate',
'daysToExpiration']]
# Convert floats
def conv_num(x):
return pd.to_numeric(x.astype(str).str.replace('NaN|nan', '', regex=True))
for col in ['bid', 'ask', 'last', 'highPrice', 'lowPrice', 'openPrice',
'closePrice', 'volatility', 'delta', 'gamma', 'theta', 'vega',
'rho', 'timeValue', 'theoreticalOptionalValue', 'strikePrice']:
puts[col] = conv_num(puts[col])
# Specifically for puts delta: make it positive
puts['delta'] = -puts['delta']
# Convert strings
def conv_str(x):
return x.astype(str)
for col in ['putCall', 'symbol', 'description']:
puts[col] = conv_str(puts[col])
# Convert integers
def conv_int(x):
return x.astype(int)
for col in ['bidSize', 'askSize', 'lastSize', 'totalVolume', 'quoteTimeInLong',
'openInterest', 'expirationDate', 'daysToExpiration']:
puts[col] = conv_int(puts[col])
# Fill missing values
puts = puts.fillna(-99)
# Rename columns
puts = puts.rename(columns={
'putCall': 'put_call',
'bidSize': 'bid_size',
'askSize': 'ask_size',
'lastSize': 'last_size',
'highPrice': 'high_price',
'lowPrice': 'low_price',
'openPrice': 'open_price',
'closePrice': 'close_price',
'totalVolume': 'total_volume',
'quoteTimeInLong': 'quote_time',
'openInterest': 'open_interest',
'timeValue': 'time_value',
'theoreticalOptionValue': 'theoretical_value',
'strikePrice': 'strike_price',
'expirationDate': 'expiration_date',
'daysToExpiration': 'dte',
})
# Push XCOM
ti.xcom_push(key='transformed_data', value=puts.to_dict('records'))
After processing the data, we do the same thing as we did in Task 2: we push the data as an XCom. Note that XCom values must be serialisable. Hence, we convert the data into a Python dictionary before pushing.
Finally, we use the function below to load the data into Postgres. The steps are a combination of what we’ve seen before. We (1) create a PostgresHook to connect to Postgres, (2) pull the XCom from the previous task, (3) convert it back into a dataframe, (4) prepare the SQL query for inserting it into Postgres, and (5) run the query. Note that the INSERT
query has been updated to use the specified ticker.
# Function to load data
def load_data_into_postgres(ticker, ti):
# Import modules
import pandas as pd
# Define Postgres hook
pg_hook = PostgresHook(postgres_conn_id='postgres_optionsdata')
# Pull XCOM
puts = ti.xcom_pull(key='transformed_data', task_ids=['transform_options_data'])[0]
puts = pd.DataFrame(puts)
# Prepare insert query
col_str = ', '.join(puts.columns.tolist())
query_insert = f"INSERT INTO {ticker} ({col_str}) VALUES %s ON CONFLICT DO NOTHING"
# Convert to rows
rows = list(puts.itertuples(index=False, name=None))
for row in rows:
pg_hook.run(query_insert % str(row))
with dag:
# Define operators
task0_create_table = PythonOperator(
task_id='create_table',
python_callable=create_table,
op_kwargs={'ticker': TICKER}
)
task1_extract = PythonOperator(
task_id='extract_options_data_from_tda',
python_callable=extract_options_data_from_tda,
op_kwargs={'ticker': TICKER}
)
task2_transform = PythonOperator(
task_id = 'transform_options_data',
python_callable=transform_options_data
)
task3_load = PythonOperator(
task_id='load_data_into_postgres',
python_callable=load_data_into_postgres,
op_kwargs={'ticker': TICKER}
)
# Set up dependencies
task0_create_table >> task1_extract >> task2_transform >> task3_load
Recall that at the start of this post, we defined TICKER='FB'
. We then used this in our DAG definition and Operators. The problem with the way this code is written now is that we would need one pretty much one identical script per ticker we would like to monitor. The only different would be the contents of the TICKER
variable.
To not repeat ourselves, we will refactor the code so that we’ll be creating dynamic DAGs, which are adaptive pipelines that change based on the inputs we give them. This code should replace the relevant sections previously presented.
First, we specify a list of tickers that we want to monitor (TICKERS
). Then, we wrap the DAG definition and operators in a function that takes the ticker
and default_args
as arguments. When this function is called, it will dynamically:
# List of tickers
TICKERS = ['FB', 'GOOG']
# Function to create DAG
def create_dag(ticker, default_args):
dag = DAG(
dag_id=f'get_options_data_{ticker}',
default_args=default_args,
description=f'ETL for {ticker} options data',
schedule_interval='*/30 8-21 * * 1-5',
catchup=False,
tags=['finance', 'options', ticker]
)
with dag:
# Define operators
task0_create_table = PythonOperator(
task_id='create_table',
python_callable=create_table,
op_kwargs={'ticker': ticker}
)
task1_extract = PythonOperator(
task_id='extract_options_data_from_tda',
python_callable=extract_options_data_from_tda,
op_kwargs={'ticker': ticker}
)
task2_transform = PythonOperator(
task_id = 'transform_options_data',
python_callable=transform_options_data
)
task3_load = PythonOperator(
task_id='load_data_into_postgres',
python_callable=load_data_into_postgres,
op_kwargs={'ticker': ticker}
)
# Set up dependencies
task0_create_table >> task1_extract >> task2_transform >> task3_load
return dag
# Create DAGs
for ticker in TICKERS:
globals()[f'get_options_data_{ticker}'] = create_dag(ticker, default_args)
Thereafter, we loop through the list of tickers, registering each dynamic DAG in the dictionary of global variables through globals()
. In the Airflow UI, we should see one DAG for each ticker:
Now, we have a single script that defines the standard data pipeline for collecting options data on a specified list of tickers. Changes to the script will change all the pipelines for the respective tickers.
# In one bash terminal, run:
airflow scheduler
# In a separate bash terminal, run:
airflow webserver
Then, click on the DAG ID (get_options_data_FB
, for example) to see more details about the DAG.
# Start Postgres
sudo service postgresql start
# Activate environment
conda activate airflow
# Start the Airflow Scheduler
airflow scheduler
# To shut it down, hit Ctrl+C
# Start the Airflow Webserver
airflow webserver
# To shut it down, hit Ctrl+C
The Airflow UI should now be available at //localhost:8080
.