三分钟上手apache顶级任务编排系统airflow

文章目录

[隐藏]

  • 编辑dag文件
    • 参数设置
  • 定义案例任务
    • 定义任务执行内容
    • 定义任务间关系
  • 加载修改过的airflow任务编排文件
  • 启动任务
  • 查看任务执行状态
  • 查看任务执行输出
  • 查看jinjia模板渲染结果
  • 删除任务
  • 一些注意的点

airflow已经成为了任务编排系统的事实标准,使用和terraform一样的代码及配置的任务开发方式。

airflow使用python作为开发语言,非常简单易学、容易上手。

完整案例代码已上传github:https://github.com/neatlife/myairflow

获取airflow实例

可以使用docker一键启动

git clone https://github.com/puckel/docker-airflow  cd docker-airflow  docker-compose -f docker-compose-LocalExecutor.yml up -d  

访问ip:8080查看效果

可以看到airflow已经可用了

编辑dag文件

这个dag文件就是用来定义任务和任务之间的先后、依赖关系的。

参数设置

参考:https://airflow.apache.org/tutorial.html#default-arguments

其中几个比较重要的参数如下:

代码实现如下 vim mydag.py

from airflow import DAG  from airflow.operators.bash_operator import BashOperator  from datetime import datetime, timedelta    default_args = {      "start_date": datetime(2019, 7, 10),      "retries": 1,      "retry_delay": timedelta(minutes=5),  }    dag = DAG("mydag", default_args=default_args, schedule_interval=timedelta(1))  

上面的”schedule_interval=timedelta(1)”代表一天触发一次这个任务,也可以使用crontab的语法,参考:https://airflow.apache.org/scheduler.html#dag-runs

这个新的dag文件需要使用python执行这个脚本导入,可以使用下面的命令进行导入

docker-compose -f docker-compose-LocalExecutor.yml exec webserver bash  python dags/mydag.py  

刷新这个web界面,就可以看到这个新加的mydag任务了

这个airflow的任务加载比较慢,如果显示状态和上面不同需要多等待一会儿

定义案例任务

这里定义三个任务,分别echo出1, 2, 3,关系如下

代码实现如下

定义任务执行内容

t1 = BashOperator(task_, bash_command="echo 1", dag=dag)  t2 = BashOperator(task_, bash_command="echo 2", dag=dag)  t3 = BashOperator(task_, bash_command="echo 3", dag=dag)  

上面使任务使用airflow执行的bash,airflow还可以执行很多输入,完整列表参考:https://airflow.apache.org/_api/airflow/operators/index.html#submodules

定义任务间关系

t2.set_upstream(t1)  t3.set_upstream(t1)  
加载修改过的airflow任务编排文件

首次修改这个dag文件,airflow回自动加载,点击”Refresh”按钮可以手动加载这个配置文件

操作效果如下

可以看到这个echo1, echo2, echo3的依赖关系定义成功了

启动任务

先启用这个任务,然后点击执行按钮,操作效果如下

也可以只启用这个任务,然后等这个airflow按照设定的时间,自动触发,手动触发可以快速验证

可以看到任务已经开始执行了

查看任务执行状态

这个任务的执行状态用不同的颜色进行表示

可以看到所有的任务都已经成功执行完成了

查看任务执行输出

单击 “Browser” > “Task Instance”可以查看所有被执行的任务列表

单击具体的task id就可以进入到task执行情况详情页面了,单击详情页的”Log”就可以看到任务的日志输出了,操作效果如下:

查看jinjia模板渲染结果

如果使用了这个airflow的模板功能,可以在任务执行详情页面,查看这个被执行任务的模板渲染结果,操作效果如下:

删除任务

需要先删除dag定义文件,比如 rm dags/mydag.py,然后在web界面上删除

一些注意的点

这个airflow依赖非常的多,因为没到1.0版本,存在一些bug,所以建议使用docker启动,主要的dag任务编排功能使用是ok的,其它功能可以先了解下

airflow可以使用模板功能生成脚本,参考:https://airflow.apache.org/tutorial.html#templating-with-jinja

jinjia模板语法参考:http://jinja.pocoo.org/docs/dev/

airflow默认以utc时区运行,如果需要计算正确的时间,需要把时间进行时区转换,核心代码如下

#将本地时间转换为utc时间,再设置为start_date  tz = pytz.timezone('Asia/Shanghai')  dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)  utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)  

原文出处:juejin -> https://juejin.im/post/5d2805f2f265da1bb56524d1

本站所发布的一切资源仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。如果侵犯你的利益,请发送邮箱到 [email protected],我们会很快的为您处理。
超哥软件库 » 三分钟上手apache顶级任务编排系统airflow