Apache Airflow is a workflow system for managing and scheduling data pipelines.
Requirements: Docker, Docker Compose, PostgreSQL, Git
-
Setup development environment.
Open your terminal and run the following commands:
git clone [email protected]:gridu/AIRFLOW-Wiktor-Morski.git
cd AIRFLOW-Wiktor-Morski
python3 -m venv venv
source venv/bin/activate
python -m pip install "apache-airflow[celery, postgres]==2.5.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.0/constraints-3.7.txt"
you can now open the
AIRFLOW-Wiktor-Morski
directory with your desired code editor in the virtual environment we've created and start developing. -
Run Airflow services in Docker.
Open your terminal and run the following commands:
- Initialize the database:
docker compose up airflow-init
- Run Airflow services in the background:
docker compose up -d
- Get Postgres container id and check if services are already healthy:
docker ps
- Open Postgres:
docker exec -it POSTGRES_CONTAINER_ID psql -U airflow
- Create database:
CREATE DATABASE test;
- Run
\l
command to see, if created correctly and\q
to quit
In the Airflow's webserver UI:
-
When all services are healty, navigate to
localhost:8080
in your web browser -
Authenticate using login:
airflow
and password:airflow
-
Create new connections (Admin -> Connections):
-
For FileSensor task:
- Connection Id:
fs_default
- Connection Type:
File (path)
- Connection Id:
-
For Postgres tasks:
- Connection Id:
postgres_localhost
- Host:
host.docker.internal
- Schema:
test
- Login:
airflow
- Password:
airflow
- Port:
5432
- Connection Id:
-
-
Create new variables (Admin -> Variables):
- (Optional) For FileSensor task:
- Key:
path_to_run_file
- Value:
path/to/run
(must include run at the end, since you specify the path and the file itself)
- Key:
- (Optional) For FileSensor task:
- Initialize the database:
All dags you can run from web UI. Firstly you have to Unpause a dag and then Trigger it.
- update_table_dag_1, update_table_dag_2, update_table_dag_3 (jobs_dag.py):
- after the dag finishes its tasks, you can check if the table was correctly inserted to the
test
database using dbeaver or by typing the following commands:docker exec -it POSTGRES_CONTAINER_ID psql -U airflow
\c test
SELECT * FROM table_dag_[number here];
[number here] means 1, 2 or 3 depending on the dag you've run- you should see the content of the table
- after the dag finishes its tasks, you can check if the table was correctly inserted to the
- trigger_dag (trigger_dag.py) - needs extra setup steps decribed below:
- to run this dag you have to put
run
file in theAIRFLOW-Wiktor-Morski/dags
folder or inpath/to/
which you specified in 2. Run Airflow services in Docker -> Create new variables (Admin -> Variables) -> 2. (Optional) For FileSensor task - this dag will trigger update_table_dag_1 (so make sure in Airflow web UI that it's Unpaused)
- (optional) this dag will send alert to your Slack workspace, to configure this you have to:
- Create your workspace - https://slack.com/get-started#/
- Create an application. Name it as you want - https://api.slack.com/
- After creation you will see basic information about your application. In the ‘Add features and functionality’ section click on ‘Permissions’ then in ‘Scopes’ under ‘Bot’ section add an OAuth scope called ‘chat:write:public’. After installation you will see a generated token that you need to save to Vault.
- Create a channel:
apache-airflow-connecting-to-slacks-api
- Add variable to HashiCorp Vault:
docker exec -it VAULT_CONTAINER_ID sh
vault login ZyrP7NtNw0hbLUqu7N3IlTdO
vault secrets enable -path=airflow -version=2 kv
vault kv put airflow/variables/slack_token value=YOUR_SLACK_TOKEN
- to run this dag you have to put
- simple_etl (simple_etl_dag.py) and example_subdag_operator (subdag_vs_taskgroup.py):
- you can simply check what these dags do and how they work in the web UI
- Run
docker-compose down --volumes --remove-orphans
command in theAIRFLOW-Wiktor-Morski
directory - Remove the entire directory
rm -rf AIRFLOW-Wiktor-Morski