Skip to content

Orchestrate Data with Apache Airflow

Description

This tutorial explains the following:

  1. How to use Apache Airflow to source Bitcoin market data from CoinCap.io.
  2. How to stage Bitcoin market data into Yellowbrick.
  3. How to build summary KPIs using an ELT approach with SQL transformations.

Note: Review the CoinCap Terms of Service. Data from CoinCap is for personal use only.

Learning Objectives

After completing this tutorial, you will understand how to do the following:

  • Connect Apache Airflow to Yellowbrick.
  • Orchestrate data pipelines to insert and transform data using SQL in Yellowbrick.

Prerequisites

Before starting, ensure you have the following prerequisites:

Step-by-Step Guide

1. Create Base Tables

Go to the Yellowbrick Query Editor (or use an IDE, such as DBeaver) and run the following SQL statements to create two tables:

sql
CREATE SCHEMA IF NOT EXISTS coincap_bitcoin;

CREATE TABLE IF NOT EXISTS coincap_bitcoin.bitcoin_price_tracker (
    id VARCHAR(10),
    timestamp TIMESTAMP,
    rank INTEGER,
    symbol VARCHAR(10),
    name VARCHAR(255),
    supply NUMERIC,
    max_supply NUMERIC,
    market_cap_usd NUMERIC,
    volume_usd_24_hr NUMERIC,
    price_usd NUMERIC,
    change_percent_24_hr NUMERIC,
    vwap_24_hr NUMERIC,
    explorer VARCHAR(255)
) DISTRIBUTE ON (timestamp);

CREATE TABLE IF NOT EXISTS coincap_bitcoin.bitcoin_kpi (
    timestamp TIMESTAMP,
    current_price NUMERIC(18, 0),
    price_change_percentage NUMERIC(38, 2),
    supply NUMERIC(18, 0),
    max_supply NUMERIC(18, 0),
    percentage_of_max_supply_mined NUMERIC(38, 2),
    volatility NUMERIC(38, 2),
    moving_average_10_days NUMERIC(18, 0),
    market_dominance NUMERIC(38, 2),
    vwap_24_hr NUMERIC(18, 0),
    vwap_crossing VARCHAR(10)
) DISTRIBUTE ON (timestamp);

2. Configure Airflow

Add the Yellowbrick Connection

Use the Airflow UI to create a PostgreSQL connection:

Note: Yellowbrick uses the PostgreSQL interface and drivers.

OR

Use the following bash statements:

bash
airflow connections add \
    --conn_id ybconnection \
    --conn_type postgres \
    --conn_host myinstance.aws.yellowbrickcloud.com \
    --conn_login myusername \
    --conn_password mypassword \
    --conn_port 5432

Note: Replace myusername and mypassword with your Yellowbrick credentials.

3. Install Pre-Created DAGs

Download the DAG files from this repository and place them in your Airflow /dags directory. The directory include the following files:

  • airflow_dag_coincap_to_yb.py
  • airflow_dag_bitcoin_kpi.py

DAG Overview

File 1: coincap_to_yb_dag

This file performs the following tasks:

  • Fetches Bitcoin price data from CoinCap API.
  • Uploads data to the bitcoin_price_tracker table.
  • Triggers bitcoin_kpi_dag upon successful data upload.
  • Schedule: Runs every 5 minutes.
File 2: bitcoin_kpi_dag

This file performs the following tasks:

  • Calculates Bitcoin KPIs using data from Yellowbrick.
  • Runs on-demand (no fixed schedule).
  • Depends on coincap_to_yb_dag execution.

KPI Calculations

The following metrics get calculated and stored in bitcoin_kpi:

ColumnDescription
Average PriceAverage Bitcoin price over different intervals.
Price VolatilityMeasures volatility using standard deviation and other statistical methods.
Price Change PercentagePercentage change in price over 24 hours.
Price MomentumMeasures price change momentum using methods, such as linear regression or moving averages.
Trading VolumeBitcoin trading volume over time.
Market CapMarket capitalization based on price and circulating supply.
Dominance RatioBitcoin's market cap share relative to the entire cryptocurrency market.
Price CorrelationCorrelation between Bitcoin and other financial assets.
Support/Resistance LevelsIdentifies potential reversal points in price trends.
Moving AveragesSimple and exponential moving averages for trend analysis.
RSI (Relative Strength Index)Identifies overbought or oversold market conditions.
Price OscillatorsTracks indicators such as MACD and Stochastic Oscillator.

4. Activate the DAGs

Using the Airflow UI

Go to the Airflow interface and activate both DAGs.

Using the CLI

Please run the following bash commands:

bash
airflow dags unpause coincap_to_yb_dag
airflow dags unpause bitcoin_kpi_dag

5. Validate Success

Use SQL statements to verify data ingestion and KPI calculations:

sql
SELECT * FROM coincap_bitcoin.bitcoin_price_tracker;
SELECT * FROM coincap_bitcoin.bitcoin_kpi;

6. Cleanup

Deactivate the DAGs

Use the Airflow UI or CLI to pause the DAGs:

bash
airflow dags pause coincap_to_yb_dag
airflow dags pause bitcoin_kpi_dag

Drop the Tables

Run the following SQL statement to clean up your database:

sql
DROP SCHEMA coincap_bitcoin CASCADE;

Next Steps: Explore other Yellowbrick tutorials to expand your data pipeline orchestration skills.