前置条件

  • 当前环境已经安装 pip
  • 当前环境已经安装 ByteHouse CLI,mac 环境可直接使用 homebrew 安装。其他环境安装方式参考文档
brew install bytehouse-cli

安装 Airflow

在本地环境或虚拟机中安装 Airflow,本教程使用 pip 进行安装。其他安装方式也可以参考 Apache Airflow 官网教程

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.1.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

注:上述命令如安装不成功,可以通过 pip3 install 进行安装

安装完毕后,可以通过运行 airflow info 命令,查看已安装完毕的环境信息。

初始化 Airflow

参考如下配置,对 Airflow 的 webserver 和 scheduler 进行初始化

# initialize the database
airflow db init

airflow users create \
    --username admin \
    --firstname admin \
    --lastname admin \
    --role Admin \
    --email admin 
# start the web server, default port is 8080
# or modify airflow.cfg set web_server_port 
airflow webserver --port 8080

启动完毕后,访问 http://localhost:8080/,输入创建时填写的用户名和密码,即可登录到 Airflow 控制台。

2756

打开一个新的 terminal 终端,启动 airflow scheduler ,之后刷新 http://localhost:8080/ 页面

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page

配置 YAML

通过 cd ~/airflow 进入到 airflow 目录,打开 airflow.cfg 进行 yaml 配置,按需要调整配置。

# 默认 sqlite, 可以改成mysql
sql_alchemy_conn = mysql+pymysql://airflow:[email protected]:8080/airflow

# 是否开启webserver认证
# authenticate = False

# airflow scheduler -D 启动scheduler失败,关闭alchemy poool
# https://github.com/apache/airflow/issues/10055
sql_alchemy_pool_enabled = False


# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/admin/airflow/dags

实现 DAG 管理任务

在 airflow 路径下新建 dags 目录,并创建 test_bytehouse.py 文件

~/airflow
mkdir dags
cd dags
nano test_bytehouse.py

编写 test_bytehouse.py,通过 BashOperator 调用 ByteHouse 进行数据导入导出。

from datetime import timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
with DAG(
    'test_bytehouse',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['example'],
) as dag:
    
    tImport  = BashOperator(
        task_id='ch_import',
        depends_on_past=False,
        bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml "INSERT INTO korver.cell_towers_1 FORMAT csv INFILE \'/opt/bytehousecli/data.csv\' "',
    )

    tSelect  = BashOperator(
        task_id='ch_select',
        depends_on_past=False,
        bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml -q "select * from korver.cell_towers_1 limit 10 into outfile \'/opt/bytehousecli/dataout.csv\' format csv "'
    )
    
    tSelect >> tImport

在当前目录下,运行下 python test_bytehouse.py 没有出现异常说明script没有编译上的错误。

刷新页面,在 web UI 中可以看到名为 test_bytehouse 新的 DAG

1280

运行 DAG

在 airflow 命令行中,查看 DAG 列表并依次测试 airflow 中执行查询和导入的任务。

# prints the list of tasks in the "test_bytehouse" DAG

[[email protected] dags]# airflow tasks list test_bytehouse
ch_import
ch_select

# prints the hierarchy of tasks in the "test_bytehouse" DAG
[[email protected] dags]# airflow tasks list test_bytehouse --tree
<Task(BashOperator): ch_select>
 <Task(BashOperator): ch_import>

运行完毕后,在对应的 ByteHouse 账户中,可以看到数据查询,导入成功。

1280