Azkaban的使用

介绍

Azkaban是LinkedIn公司提出的一个任务依赖管理、任务提交和监控的框架,它包含三个主要的组件:

  • Relational Database(Mysql)
  • AzkabanWebServer
  • AzkabanExecutorServer

Azkaban使用Mysql来存储它的状态,WebServer和ExecutorServer都需要访问Mysql。

AzkabanWebServer使用DB的理由:

  • 工程管理:权限控制和上传文件
  • 执行流的状态:跟踪执行流的状态
  • 历史工作流和任务:需要搜索历史运行过的工作流和任务
  • 调度器:跟踪调度器的状态
  • SLA: 记住所有的SLA的规则

AzkabanExecutorServer使用DB的理由:

  • 访问项目:从DB中提取项目文件
  • 执行Flow/job:为执行的Flow提取和更新数据
  • Logs:存储输出的日志
  • Interflow 依赖:它会从DB中拿到状态。

1.创建项目

在Azkaban中,一个project可以包含多个workflow,一个workflow可以包含多个job,workflow之间可以相互依赖,job之间也可以相互依赖。

这里,我们通过*.job文件来定义任何一个job,一个项目中可以包含jar包,bash shell 脚本,.job文件(必须的)。我们在本地写好,将工程目录打包成.zip文件,然后通过Azkaban的web client上传到Azkaban的executor执行目录中,然后就可以配置和执行了。

2.几个例子

工程的主要构建就是创建.job文件。

2.1 简单shell脚本的执行

假设在这个工程中只有一个job,对应一个workflow,我们创建一个test.job文件。

  • 常见shell脚本

      type=command
      command=echo 'hello,Azkaban'
      command.1=echo 'the next step'
    

可以通过数字1,2...来指定子命令,先执行command,再执行command.1 command.2 ....

  • 执行python脚本

      type=command
      command=python hello.py
    

2.2一个工作流的Job之间的依赖

我们定义一个工作流,在数据导入hive前需要进行数据清洗,数据清洗前需要上传,上传之前需要从ftp获取日志。

我们定义5个job:

  • load_hive.job:数据导入hive
  • clean_data.job: 清洗hdfs数据
  • upload_hdfs.job: 将文件上传至hdfs
  • getfileftp1.job: 从ftp1获取日志
  • getfileftp2.job: 从ftp2获取日志

依赖关系:4,5相互独立,3依赖4和5,2依赖3,1依赖2

  • 1.load_hive.job:

      type=command
      command=sh hive.sh #如果hive.sh和load_hive放在一个目录下,直接指定文件名
      dependencies=clean_data
    
  • 2.clean_data.job

      type=command
      command=sh clean_data.sh
      dependencies=upload_hdfs
    
  • 3.upload_hdfs.job

      type=command
      command=hadoop fs -put /data/*  /use/lanyizheng/data/*
      dependencies=get_file_ftp1,get_file_ftp2
    
  • 4.getfileftp1.job

      type=command
      command=wget "ftp://file1" -O /data/file1
    
  • 5.getfileftp2.job

      type=command
      command=wget "ftp:file2" -O /data/file2
    

可以运行Unix命令,也可以运行python脚本。

3.工作流与工作流之间的依赖

工作流可以嵌套到另一个工作流中作为一个节点,这样的workflow称作embedded flow。下面是创建的一个embedded flow的例子,我们仅仅创建一个.job文件就可以了:

baz.job :定义一个flow
type=flow
flow.name=bar  #必须要有bar的job

bar.job: flow实际的指向
type=command
command=echo "Hello"

top.job:此任务依赖baz
type=command
command=echo "top task"
dependencies=baz

4.参数之间的层级关系

任何.properties文件将在工作流中job之间共享变量。

下面有个工程文件目录:

system.properties
baz.job
myflow/
   myflow.properties
   myflow2.properties
   foo.job
   bar.job

bar任务仅仅从system.properties中继承,myflow.properties和myflow2.properties从system.properties中继承,foo和bar任务又可以从myflow.properties和myflow2.properties中继承。

下面是一个例子:

# shared.properties
replaceparameter=bar

# myjob.job
param1=mytest
foo=${replaceparameter}
param2=${param1}

5.高级功能

执行定时任务:在任务执行前,配置Scheduling选项 邮件通知:在执行任务前,在Notification选项中配置成功、失败的通知邮件 并发执行选项:可以配置任务的并发执行的情况。 SLA: 添加任务规则配置选项。 HDFS:可以查看HDFS的目录,并控制权限。 具体可参考:Azkaban的docs