Airflow
前置条件
- 当前环境已经安装 pip
- 当前环境已经安装 ByteHouse CLI,mac 环境可直接使用 homebrew 安装。其他环境安装方式参考文档
brew install bytehouse-cli
安装 Airflow
在本地环境或虚拟机中安装 Airflow,本教程使用 pip 进行安装。其他安装方式也可以参考 Apache Airflow 官网教程
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.1.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
注:上述命令如安装不成功,可以通过 pip3 install 进行安装
安装完毕后,可以通过运行 airflow info
命令,查看已安装完毕的环境信息。
初始化 Airflow
参考如下配置,对 Airflow 的 webserver 和 scheduler 进行初始化
# initialize the database
airflow db init
airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin
# start the web server, default port is 8080
# or modify airflow.cfg set web_server_port
airflow webserver --port 8080
启动完毕后,访问 http://localhost:8080/,输入创建时填写的用户名和密码,即可登录到 Airflow 控制台。
打开一个新的 terminal 终端,启动 airflow scheduler ,之后刷新 http://localhost:8080/ 页面
# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page
配置 YAML
通过 cd ~/airflow
进入到 airflow 目录,打开 airflow.cfg 进行 yaml 配置,按需要调整配置。
# 默认 sqlite, 可以改成mysql
sql_alchemy_conn = mysql+pymysql://airflow:[email protected]:8080/airflow
# 是否开启webserver认证
# authenticate = False
# airflow scheduler -D 启动scheduler失败,关闭alchemy poool
# https://github.com/apache/airflow/issues/10055
sql_alchemy_pool_enabled = False
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/admin/airflow/dags
实现 DAG 管理任务
在 airflow 路径下新建 dags 目录,并创建 test_bytehouse.py 文件
~/airflow
mkdir dags
cd dags
nano test_bytehouse.py
编写 test_bytehouse.py,通过 BashOperator 调用 ByteHouse 进行数据导入导出。
from datetime import timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'test_bytehouse',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=['example'],
) as dag:
tImport = BashOperator(
task_id='ch_import',
depends_on_past=False,
bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml "INSERT INTO korver.cell_towers_1 FORMAT csv INFILE \'/opt/bytehousecli/data.csv\' "',
)
tSelect = BashOperator(
task_id='ch_select',
depends_on_past=False,
bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml -q "select * from korver.cell_towers_1 limit 10 into outfile \'/opt/bytehousecli/dataout.csv\' format csv "'
)
tSelect >> tImport
在当前目录下,运行下 python test_bytehouse.py
没有出现异常说明script没有编译上的错误。
刷新页面,在 web UI 中可以看到名为 test_bytehouse 新的 DAG
运行 DAG
在 airflow 命令行中,查看 DAG 列表并依次测试 airflow 中执行查询和导入的任务。
# prints the list of tasks in the "test_bytehouse" DAG
[root@VM-64-47-centos dags]# airflow tasks list test_bytehouse
ch_import
ch_select
# prints the hierarchy of tasks in the "test_bytehouse" DAG
[root@VM-64-47-centos dags]# airflow tasks list test_bytehouse --tree
<Task(BashOperator): ch_select>
<Task(BashOperator): ch_import>
运行完毕后,在对应的 ByteHouse 账户中,可以看到数据查询,导入成功。
Updated almost 2 years ago