从零开始学习Apache Airflow的使用步骤
Apache Airflow 作为一款功能强大的开源工作流管理工具,它专为数据工程师、数据科学家以及数据分析师设计,用于创建、调度、监控以及管理复杂的计算和数据处理工作流。这些工作流可以包含各种任务,比如数据提取(ETL)、数据转换、数据加载(ETL 的 L 部分)、机器学习模型训练与评估等。下面,我们将详细介绍 Apache Airflow 的使用步骤,帮助感兴趣的朋友快速上手。
1. 安装 Apache Airflow
你可以通过以下命令来安装 Airflow:
pip install apache-airflow
建议使用虚拟环境来管理 Airflow 的依赖项。
2. 初始化数据库
Airflow 需要一个数据库来存储任务执行状态和其他元数据信息。初始化数据库的命令:
airflow db init
3. 创建用户
你需要创建一个管理员账户以访问 Airflow 的 web 界面:
airflow users create \ --username admin \ --password admin \ --firstname Firstname \ --lastname Lastname \ --role Admin \ --email admin@example.com
4. 启动 Airflow Scheduler 和 Web Server
Airflow 包含一个调度器(Scheduler
)和一个 Web 服务器(Web Server
),你需要分别启动这两个服务。
启动调度器:
airflow scheduler
启动 Web Server:
airflow webserver
Web Server 默认在 localhost:8080
上运行,你可以通过浏览器访问它。
5. 创建 DAG(有向无环图)
在 Airflow 中,工作流是通过 DAG(Directed Acyclic Graph)来定义的。一个简单的 DAG 例子如下:
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def my_task(): print("This is a task") default_args = { 'start_date': datetime(2023, 9, 1), 'retries': 1 } with DAG( 'my_dag', default_args=default_args, schedule_interval='@daily' ) as dag: task = PythonOperator( task_id='my_task', python_callable=my_task )
- DAG 是用 Python 定义的,
default_args
包含任务的默认参数。 - PythonOperator 用于执行 Python 函数。
6. 设置任务依赖
你可以通过设置任务的依赖来定义任务的执行顺序。例如:
task1 >> task2 # task1 先执行,task2 后执行
7. 将 DAG 放入 DAGs 文件夹
将你定义的 DAG 文件保存到 Airflow 的 DAGs 文件夹中。这个文件夹的位置通常是 $AIRFLOW_HOME/dags/
,或者你可以在 airflow.cfg
文件中配置。
8. 监控 DAG
访问 Airflow 的 Web 界面,你可以看到所有定义的 DAG,查看它们的执行状态,手动触发执行,并监控各个任务的日志。
9. 常见 Airflow 操作
触发 DAG:
airflow dags trigger my_dag
列出 DAG:
airflow dags list
查看任务状态:
airflow tasks list my_dag
Airflow 是一个强大的调度和工作流管理工具,适合处理复杂的数据管道和任务依赖。
Apache Airflow 凭借其强大的功能和灵活的扩展性,成为了许多企业数据管道和机器学习工作流的首选工具。通过遵循上述步骤,你可以快速上手并开始使用 Apache Airflow 来管理你的数据工作流。
码云笔记 » 从零开始学习Apache Airflow的使用步骤