Airflow 连接 ByteHouse CLI 实现数据导入及查询

前置条件

  • 当前环境已经安装 pip
  • 当前环境已经安装 ByteHouse CLI,mac 环境可直接使用 homebrew 安装。其他环境安装方式参考文档
brew install bytehouse-cli

安装 Airflow

在本地环境或虚拟机中安装 Airflow,本教程使用 pip 进行安装。其他安装方式也可以参考 Apache Airflow 官网教程

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.1.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

注:上述命令如安装不成功,可以通过 pip3 install 进行安装

安装完毕后,可以通过运行 airflow info 命令,查看已安装完毕的环境信息。

初始化 Airflow

参考如下配置,对 Airflow 的 webserver 和 scheduler 进行初始化

# initialize the database
airflow db init

airflow users create \
    --username admin \
    --firstname admin \
    --lastname admin \
    --role Admin \
    --email admin 
# start the web server, default port is 8080
# or modify airflow.cfg set web_server_port 
airflow webserver --port 8080

启动完毕后,访问 http://localhost:8080/,输入创建时填写的用户名和密码,即可登录到 Airflow 控制台。

27562756

打开一个新的 terminal 终端,启动 airflow scheduler ,之后刷新 http://localhost:8080/ 页面

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page

配置 YAML

通过 cd ~/airflow 进入到 airflow 目录,打开 airflow.cfg 进行 yaml 配置,按需要调整配置。

# 默认 sqlite, 可以改成mysql
sql_alchemy_conn = mysql+pymysql://airflow:[email protected]:8080/airflow

# 是否开启webserver认证
# authenticate = False

# airflow scheduler -D 启动scheduler失败,关闭alchemy poool
# https://github.com/apache/airflow/issues/10055
sql_alchemy_pool_enabled = False


# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/admin/airflow/dags

实现 DAG 管理任务

在 airflow 路径下新建 dags 目录,并创建 test_bytehouse.py 文件

~/airflow
mkdir dags
cd dags
nano test_bytehouse.py

编写 test_bytehouse.py,通过 BashOperator 调用 ByteHouse 进行数据导入导出。

from datetime import timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
with DAG(
    'test_bytehouse',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['example'],
) as dag:
    
    tImport  = BashOperator(
        task_id='ch_import',
        depends_on_past=False,
        bash_command='/home/admin/bytehousecli/bytehouse-cli -cf /home/admin/bytehousecli/conf.toml -q "INSERT INTO test.product FORMAT csv INFILE \'/home/admin/bytehousecli/data.csv\' "',
    )

    tSelect  = BashOperator(
        task_id='ch_select',
        depends_on_past=False,
        bash_command='/home/admin/bytehousecli/bytehouse-cli -cf /home/admin/bytehousecli/conf.toml -q "select * from test.product limit 10 into outfile \'/home/admin/bytehousecli/data.csv\' format csv "',
    )
    
    tSelect >> tImport

在当前目录下,运行下 python test_bytehouse.py 没有出现异常说明script没有编译上的错误。

刷新页面,在 web UI 中可以看到名为 test_bytehouse 新的 DAG

12801280

运行 DAG

在 airflow 命令行中,查看 DAG 列表并依次测试 airflow 中执行查询和导入的任务。

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "test_bytehouse" DAG
airflow tasks list test_bytehouse

# prints the hierarchy of tasks in the "test_bytehouse" DAG
airflow tasks list test_bytehouse --tree

# command layout: command subcommand dag_id task_id date
# testing task ch_select 
airflow tasks test test_bytehouse ch_select 2021-09-27
# testing task ch_import
airflow tasks test test_bytehouse ch_import 2021-09-27
# testing whole dag
airflow dags test test_clickhouse 2021-09-26

运行完毕后,在对应的 ByteHouse 账户中,可以看到数据查询,导入成功。

21192119

Did this page help you?