Airflow 连接 ByteHouse CLI 实现数据导入及查询


  • 当前环境已经安装 pip
  • 当前环境已经安装 ByteHouse CLI,mac 环境可直接使用 homebrew 安装。其他环境安装方式参考文档
brew install bytehouse-cli

安装 Airflow

在本地环境或虚拟机中安装 Airflow,本教程使用 pip 进行安装。其他安装方式也可以参考 Apache Airflow 官网教程

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

注:上述命令如安装不成功,可以通过 pip3 install 进行安装

安装完毕后,可以通过运行 airflow info 命令,查看已安装完毕的环境信息。

初始化 Airflow

参考如下配置,对 Airflow 的 webserver 和 scheduler 进行初始化

# initialize the database
airflow db init

airflow users create \
    --username admin \
    --firstname admin \
    --lastname admin \
    --role Admin \
    --email admin 
# start the web server, default port is 8080
# or modify airflow.cfg set web_server_port 
airflow webserver --port 8080

启动完毕后,访问 http://localhost:8080/,输入创建时填写的用户名和密码,即可登录到 Airflow 控制台。


打开一个新的 terminal 终端,启动 airflow scheduler ,之后刷新 http://localhost:8080/ 页面

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page


通过 cd ~/airflow 进入到 airflow 目录,打开 airflow.cfg 进行 yaml 配置,按需要调整配置。

# 默认 sqlite, 可以改成mysql
sql_alchemy_conn = mysql+pymysql://airflow:[email protected]:8080/airflow

# 是否开启webserver认证
# authenticate = False

# airflow scheduler -D 启动scheduler失败,关闭alchemy poool
sql_alchemy_pool_enabled = False

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/admin/airflow/dags

实现 DAG 管理任务

在 airflow 路径下新建 dags 目录,并创建 文件

mkdir dags
cd dags

编写,通过 BashOperator 调用 ByteHouse 进行数据导入导出。

from datetime import timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
with DAG(
    description='A simple tutorial DAG',
) as dag:
    tImport  = BashOperator(
        bash_command='/home/admin/bytehousecli/bytehouse-cli -cf /home/admin/bytehousecli/conf.toml -q "INSERT INTO test.product FORMAT csv INFILE \'/home/admin/bytehousecli/data.csv\' "',

    tSelect  = BashOperator(
        bash_command='/home/admin/bytehousecli/bytehouse-cli -cf /home/admin/bytehousecli/conf.toml -q "select * from test.product limit 10 into outfile \'/home/admin/bytehousecli/data.csv\' format csv "',
    tSelect >> tImport

在当前目录下,运行下 python 没有出现异常说明script没有编译上的错误。

刷新页面,在 web UI 中可以看到名为 test_bytehouse 新的 DAG


运行 DAG

在 airflow 命令行中,查看 DAG 列表并依次测试 airflow 中执行查询和导入的任务。

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "test_bytehouse" DAG
airflow tasks list test_bytehouse

# prints the hierarchy of tasks in the "test_bytehouse" DAG
airflow tasks list test_bytehouse --tree

# command layout: command subcommand dag_id task_id date
# testing task ch_select 
airflow tasks test test_bytehouse ch_select 2021-09-27
# testing task ch_import
airflow tasks test test_bytehouse ch_import 2021-09-27
# testing whole dag
airflow dags test test_clickhouse 2021-09-26

运行完毕后,在对应的 ByteHouse 账户中,可以看到数据查询,导入成功。


