FFlow 流程编排指引
- 作者
配置约定和规范
缩写约定
缩写 | 含义描述 |
---|---|
w.i | workflow.input,流程输入的引用 |
w.v | workflow.variables,流程全局变量的引用 |
t1.po | t1.poll_output,t1节点的轮询出参 |
t1.o | t1.output,t1节点的输出引用 |
this.o | 当前节点的输出引用 |
节点级别支持的数据引用
引用缩写 | 含义描述 |
---|---|
this.nodeInstID 或 this.node_inst_id | 获取当前节点的实例ID |
this.nodeRefName 或 node_ref_name | 获取当前节点的引用名称 |
this.owner | 获取当前节点的 owner |
流程定义
流程字段描述
一级字段名 | 二级字段名 | 类型 | 描述 |
---|---|---|---|
name | string | 流程名称(必填) | |
desc | string | 流程描述(必填) | |
timeout | duration | string | 流程超时时间,格式为time.ParseDuration支持的格式如:"5s"(5秒), "5m"(5分钟), "5h"(5小时), "5d"(5天), "5w"(5周)等, 配置的最大时间间隔不超过2个月,默认超时时间为4w(4周) |
policy | string | 流程超时策略,TIME_OUT_WF表示超时则终止流程,ALERT_ONLY表示仅发送服务号告警不终止继续执行。默认TIME_OUT_WF。 | |
biz | object | 业务配置,用户可以自定义 | |
variables | object | 全局变量,包括内置的和流程相关的全局变量,instID/inst_id(流程实例ID)/instName/inst_name(流程实例名称)/defID/def_id(流程定义ID)/defVersion/def_version(流程定义版本号) | |
owner | string | 流程的拥有者的企业微信号, 多用户使用分号";"进行分隔,功能:消息通知出口 (当wechat和chatGroup未配置时,消息出口默认置为流程触发者,当无触发者时则为流程创建者) | |
chatGroup | string | 流程相关的群聊,多群聊使用分号";"进行分隔,功能:消息通知出口 | |
nodes | array | 流程的节点列表(必填) | |
input | array | 流程输入字段 | |
triggers | object | 流程的触发器,触发器的定义请参考后面的"触发器定义"章节 | |
webhooks | array | 流程事件webhook地址列表 | |
subworkflows | array | 子流程定义 |
流程输入字段描述
input 字段类型为object,为key|value形式,而其中的每个输入key的描述字段必须包含以下内容
字段名 | 类型 | 描述 |
---|---|---|
default | string | 参数默认值 |
required | bool | 手动填写的时候是否为必须填写,默认为必填 |
options | array | 可选值,为string类型的数组 |
示例
name: 流程定义示例
desc: 这是一个流程定义示例
timeout:
duration: 10m
policy: TIME_OUT_WF
input:
- operator:
options:
- workflow
- conductor
default: workflow
required: true
owner:
wechat: replaceme
chatGroup: replaceme
variables:
instName: 测试项目
nodes:
- t0:
type: ASSIGN
assign:
- biz:
projectName: 流程定义示例
- owner:
wechat: replaceme
- variables:
test: hh
next: end
return:
code: 0
触发器定义
触发器字段描述
字段名 | 类型 | 描述 |
---|---|---|
type | string | timer/event,timer表示定时触发器,event表示事件触发器(必填) |
event | string | 事件名称(必填) |
expr | string | 基于timer的定时启动表达式语言,为cron表达式。(目前不支持秒级别的配置,最细粒度为分钟级,配置限制为6位,暂不支持年份级别的定时任务) |
actions | array | 触发器执行的操作集(执行动作没有先后之分,action之间没有相关性,由action组成的数组)(必填) |
触发器操作描述
字段名 | 类型 | 描述 |
---|---|---|
allowDays | string | 默认为所有时间段都可以执行,ANY(任意时间)/WORKDAY(工作日)/WEEKEND(周六周日) |
condition | string | 触发限制条件,当配置后则必须满足此条件才能触发(必填) |
action | string | START_WORKFLOW(启动流程)/CANCEL_WORKFLOW(取消执行流程)/RERUN_NODE(重新执行节点)/RESUME_NODE(恢复节点执行)(必填) |
args | object | 传入参数(根据不同的action有不同的限制) |
触发器类型描述
START_WORKFLOW
对应args的object格式限制:
字段名 | 类型 | 描述 |
---|---|---|
name | string | 流程实例名称(必填) |
input | object | 流程实例的入参 |
示例
type: timer
expr: 0 15 11 ? * MON,TUE,WED,THU,FRI
actions:
sw:
action: START_WORKFLOW
allowDays: WORKDAY
args:
operator: timer
name: test
input:
k1: v1
定时表达式示例
表达式含义 | 表达式(注意表达式不要多出空格) |
---|---|
每小时的10分30秒触发任务 | 30 10 * * * ? |
每天1点10分30秒触发任务 | 30 10 1 * * ? |
每月20号1点10分30秒触发任务 | 30 10 1 20 * ? |
每个月最后一天的10点15分0秒触发任务 | 0 15 10 L * ? |
星期一到星期五的10点15分0秒触发任务 | 0 15 10 ? * MON-FRI |
每周二、四、六下午五点 | 0 0 17 ? * TUE,THU,SAT |
每天5-15点整点触发 | 0 0 5-15 * * ? |
每天上午10点,下午2点,4点 | 0 0 10,14,16 * * ? |
RERUN_NODE/RESUME_NODE/RUN_NODE(暂不支持)/COMPLETE_NODE
在action为RUN_NODE的时候,支持配置非流程相关的功能节点在触发器的args里面。RERUN_NODE前提为: 节点至少跑过一次,不考虑其他前置条件。 在action为RERUN_NODE和RESUME_NODE的时候,对应args的object格式限制:
字段名 | 类型 | 描述 |
---|---|---|
node | string | 重跑或者恢复节点的引用名称(必填) |
示例
type: event
event: FiberUpdateUserEvent
actions:
- a1:
action: RERUN_NODE
condition: ${event.type == w.v.type}
args:
node: t2
operator: ${event.operator}
- a4:
action: COMPLETE_NODE
condition: ${event.operator == w.v.operator}
args:
operator: ${event.operator}
node: ${event.node}
input:
k1: v1
output:
k1: v2
status: succeed
节点定义
节点字段描述
一级字段名 | 二级字段名 | 类型 | 描述 |
---|---|---|---|
type | string | 节点类型, 包括:service,switch,fork,join,exclusive_join,transform,assign,ref,sub_workflow(必填) | |
name | string | 节点名称 | |
condition | string | 节点执行条件,条件不满足节点不会执行 | |
webhooks | string | 节点事件webhook地址列表 | |
retry | count | int | 如果执行失败(失败包括通过服务节点successCondition判断的失败以及节点执行过程中返回err)的重试次数,默认为0(默认不重试) |
duration | string | 重试初始延迟时间,格式为time.ParseDuration支持的格式如:"5s"(5秒), "5m"(5分钟), "5h"(5小时)等, 配置的最大时间间隔不超过1天,最小时间间隔为1秒 | |
policy | string | 重试策略,默认为EXPONENTIAL_BACKOFF,FIXED(固定间隔retry.duration)/EXPONENTIAL_BACKOFF(指数退避retry.duration* attempts)(attempts=retry.count) | |
timeout | nearTimeoutDuration | string | 接近超时事件的时间设置,这里的超时时间包括节点等待时间,格式为time.ParseDuration支持的格式如:"5s"(5秒), "5m"(5分钟), "5h"(5小时), "5d"(5天), "4w"(4周)等, 配置的最大时间间隔不超过1个月,最小时间间隔为30秒,不存在默认时间配置 |
nearTimeoutExpr | string | 接近超时事件的时间设置,这里的超时时间包括节点等待时间,格式为cron表达式,比如"0 0 12 ? * WED * "表示节点执行后第一个周三的12点(能感知,类似会议倒计时通知) | |
nearTimeoutPolicy | string | 接近超时策略,ALERT_ONLY表示发送服务号告警 | |
duration | string | 超时事件的时间设置,这里的超时时间包括节点等待时间,格式为time.ParseDuration支持的格式如:"5s"(5秒), "5m"(5分钟), "5h"(5小时), "5d"(5天), "4w"(4周)等, 配置的最大时间间隔不超过1个月,最小时间间隔为30秒,不存在默认时间配置 | |
expr | string | 超时事件的时间设置,优先使用duration的配置,这里的超时时间包括节点等待时间,格式为cron表达式,比如"0 0 12 ? * WED * "表示节点执行后第一个周三的12点(和timeout.duration不互斥,谁先触发,cron只触发一次), cron触发的时间与节点开始执行的时间的最大时间间隔不超过1个月,最小时间间隔为30秒 | |
policy | string | 超时策略,TIME_OUT_WF表示超时则终止节点执行(只有异步的接口才存在终止节点执行的接口,同步接口不存在实际上的终止),ALERT_ONLY表示发送服务号告警,默认为ALERT_ONLY | |
wait | duration | string | (在重跑节点的时候等待不会生效)节点执行前的等待时间设置,格式为time.ParseDuration支持的格式如:"5s"(5秒), "5m"(5分钟), "5h"(5小时), "5d"(5天), "4w"(4周), "max"(一直等待到流程超时)等, 配置的最大时间间隔不超过1个月(除max外),最小时间间隔为1秒 |
expr | string | 节点执行前的等待时间设置,格式为cron表达式,比如"0 0 12 ? * WED * "表示节点执行后第一个周三的12点 (同上timeout.expr) | |
allowDays | string | 默认为所有时间段都可以执行,ANY(任意时间)/WORKDAY(工作日)/HOLIDAY(假日)/WEEK(工作日+周末)/WEEKEND(周六周日) | |
schedule | failedPolicy | string | 失败策略,默认为TERMINAL,IGNORE(忽略失败)/TERMINAL(中止流程) |
schedulePolicy | string | 调度策略,默认为SCHEDULE_NEXT_UNTIL_COMPLETE,SCHEDULE_NEXT_IF_NOT_COMPLETE(没有完成就可以调度下一个节点)/SCHEDULE_NEXT_UNTIL_COMPLETE(调度下一个直到节点执行完成)/IGNORE_FIRST_SCHEDULE(忽略第一次被调度,针对当前实例) | |
executeTimesPolicy | string | 执行次数策略,默认为ANY, AT_LEAST_ONCE(最少执行一次)/EXACTLY_ONCE(有且执行一次)/ANY(可以执行任意次) | |
asyncComplete | bool | 异步结束,异步有两种情况,一种需要用户自行配置触发器,由触发器负责标记节点完成;另一种由轮询实现异步,轮询完成后标记节点完成; | |
biz | object | 业务配置 "input","output" 字段为系统保留字段,用户不能自己设置。 | |
asyncComplete | bool | 异步结束,异步有两种情况,一种需要用户自行配置触发器,由触发器负责标记节点完成;另一种由轮询实现异步,轮询完成后标记节点完成; | |
owner | string | 节点的拥有者的企业微信号, 多用户使用分号";"进行分隔,功能:消息通知出口 (当onwer和ownerchatGroup未配置时,消息出口默认置为流程触发者,当无触发者时则为流程创建者) | |
chatGroup | string | 节点相关的群聊,多群聊使用分号";"进行分隔,功能:消息通知出口 | |
next | string | 下一个执行节点,默认以流程中该节点的下一个节点为 next 节点,配置为其他节点时需填入节点 key 值,配置为 end 标记流程结束 | |
msg | wait | string | 节点等待开始的消息 |
start | string | 节点执行开始的消息 | |
asynWait | string | 节点异步等待的消息 | |
success | string | 节点执行成功的消息 | |
fail | string | 节点执行失败的消息 | |
cancel | string | 节点执行取消的消息 | |
timeout | string | 节点执行超时的消息 | |
nearTimeout | string | 节点执行接近超时的消息 |
节点类型定义
SERVICE节点
服务功能节点
字段名 | 类型 | 描述 |
---|---|---|
args | object | 请求入参(必填) |
pollArgs | object | 轮询的入参 |
cancelArgs | object | 取消节点执行的入参,比如说启动蓝盾流水线后希望停止该流水线执行 |
Args通用字段描述
字段名 | 类型 | 描述 |
---|---|---|
protocol | string | 服务接口协议类型(service节点),TRPC(TRPC功能节点)/HTTP(HTTP功能节点)/FAAS(函数功能节点)/TRPC_HTTP(基于TRPC的单 protocol 是 HTTP 的节点)(必填) |
service | string | TRPC的服务名称 |
method | string | TRPC的方法名 |
body | object | 接口的入参 |
stringify | array | 将服务节点入参中的特定key从map转换成json string,比如["pipelineConfig"]会将服务节点入参的map["pipelineConfig"]从一个map转换成一个json string,{"pipelineConfig":{"k1":"v1"}} -> {"pipelineConfig":"{\"k1\":\"v1\"}"} |
appendVariables | array | 将流程所有的全局变量追加到服务节点入参中的特定key中,比如["k2.v2"]会将流程所有的全局变量追加到服务节点入参的map["k2"]["v2"]中 |
timeoutDuration | string | 轮询超时时间,默认为7天,格式为time.ParseDuration支持的格式如:"5s"(5秒), "5m"(5分钟), "5h"(5小时), "5d"(5天), "4w"(4周)等, 配置的最大超时时间为30天,最小时间间隔为1分钟,不存在默认时间配置 |
pollCondition | string | 执行轮询的条件,当 pollingcondition 不满足时则取消轮训(引擎层面,没有默认值,轮询节点没有配置pollCondition流程会失败) |
cancelCondition | string | 节点执行被取消条件,只在轮询节点中生效 (内容为条件表达式,当满足条件时则节点被取消。取消即为中止) |
successCondition | string | 节点执行成功条件,只有配置了该条件才能判断节点是否执行成功,配在pollArgs里面的时候表示轮询结束后节点是否执行成功,配在args里面表示同步节点是否执行成功,同步节点执行失败就不会再继续执行后面的轮询 |
initialDuration | string | 初始的轮询的时间间隔,格式为time.ParseDuration支持的格式如:"5s"(5秒), "5m"(5分钟), "5h"(5小时)等, 配置的最大时间间隔为1天,最小时间间隔为1秒,默认为10秒 |
maxDuration | string | 最大的轮询的时间间隔,格式为time.ParseDuration支持的格式如:"5s"(5秒), "5m"(5分钟), "5h"(5小时)等, 配置的最大时间间隔为1天,最小时间间隔为1秒,默认为60秒 |
policy | string | 轮询策略,默认为EXPONENTIAL_BACKOFF,FIXED(固定间隔retry.delayDuration)/EXPONENTIAL_BACKOFF(指数退避retry.delayDuration*attempts) |
TRPC args字段描述
字段名 | 类型 | 描述 |
---|---|---|
protocol | string | TRPC(必填) |
service | string | TRPC的服务名称(必填) |
func | string | TRPC的方法别名,有TRPC的方法名的情况下可以不填 |
method | string | TRPC的方法名(和 func 不能同时为空) |
body | object | 接口的入参 |
target | string | 填空时通过北极星寻址服务 |
TRPC 协议示例
name: 调用蓝盾流水线
schedule: {}
args:
service: trpc.dws.devopsserver.BkHub
method: StartBuild
protocol: TRPC
body:
projectID: tdcos-templates
pipelineID: p-4ca6197041984cd98dba3e2b23a4a6c8
pipelineConfig:
config:
project: 蓝盾测试
pollArgs:
service: trpc.dws.devopsserver.BkHub
method: GetBuildResult
protocol: TRPC
timeoutDuration: 1h
pollCondition: ${t2.po.data.completed == true}
successCondition: ${t2.po.data.status == 'SUCCEED'}
cancelCondition: ${t2.po.data.status == 'CANCELED'}
body:
buildID: ${t2.o.data.id}
outputVariables:
- publish_date
cancelArgs:
service: trpc.dws.devopsserver.BkHub
method: CancelBuild
protocol: TRPC
body:
buildID: ${t2.o.data.id}
type: SERVICE
TRPC_HTTP 协议示例
args:
body:
creator: petehuang
description: testCreateIteration
enddate: 2022-03-11
name: test
startdate: 2022-03-11
workspace_id: "69991118"
func: /devops/tapd/createiteration
protocol: TRPC_HTTP
service: trpc.dws.devopsserver.TapdHub
name: 创建迭代
type: SERVICE
HTTP args字段描述
字段名 | 类型 | 描述 |
---|---|---|
protocol | string | HTTP(必填) |
method | string | HTTP的方法类型,比如POST/GET(必填) |
url | string | 请求的url地址(必填) |
headers | object | HTTP的请求头 |
parameter | object | HTTP的请求参数 |
body | object | HTTP的请求体 |
示例
name: HTTP调用示例
args:
protocol: HTTP
method: POST
url: http://www.fflow.link
body:
params:
width: 100
height: 100
headers:
Accept: application/json
Content-Type: application/json
type: SERVICE
TIP:HTTP 节点的输出是一个 HTTP 接口返回包,包括了 body ,header,status,因此要获取输出值时需要使用
node.o.body.somedata
来获取
FAAS args字段描述
这里主要指的是老版基于nodejs 的 FAAS 服务,新版 FAAS 建议直接使用 HTTP 方式调用
字段名 | 类型 | 描述 |
---|---|---|
protocol | string | FAAS(必填) |
namespace | string | 函数的namespace(必填) |
func | string | 函数的名字(必填) |
body | object | 函数的入参 |
示例
name: FAAS示例
args:
protocol: FAAS
namespace: default
func: HttpRequestExample
body:
user: replaceme
type: service
SWITCH节点
条件节点
字段描述
字段名 | 类型 | 描述 |
---|---|---|
switch | object | 多个case分支节点配置 |
condition 和 next 必填
示例
name: 条件节点
type: switch
switch:
- condition: "${w.i.operator == 'replaceme'}"
next: case1
- condition: "${w.i.operator == 'replaceme1'}"
next: case2
next: defaultCase
EXCLUSIVE_JOIN节点
在条件节点后可以选择配置互斥JOIN节点,节点的输出即为执行到的分支路径的输出
示例
name: ExclusiveJoin示例
type: exclusive_join
TRANSFORM节点
转换节点,提供参数转换的能力
output 必填
示例
type: transform
output:
lala: "${w.i.operator}"
next: e2
FORK节点
并行节点,一个Fork必须匹配一个Join节点
字段描述
字段名 | 类型 | 描述 |
---|---|---|
fork | array | 并行的节点列表。array为节点结构组成的数组。(必填) |
示例
fork:
- fj1
- fj2
JOIN节点
在并行分支的结尾可以选择配置JOIN节点,这样只有当分支都执行完成后,JOIN节点才算执行完成,JOIN节点没有输入输出
示例
type: join
SUB_WORKFLOW节点
子流程节点
字段描述
字段名 | 类型 | 描述 |
---|---|---|
subworkflow | string | (引用内部子流程)子流程节点引用名称(和 id 一起必填一个) |
id | number | (引用外部流程作为子流程)流程的id |
version | number | 当配置了 id 字段后可以填写 version,默认为最新版本 |
args | object | 启动流程的相关参数,详情见 args 字段描述 |
args 字段描述
字段名 | 类型 | 描述 |
---|---|---|
name | string | 子流程名称 |
operator | string | 启动者 |
input | object | 流程的入参 |
示例
name: 子流程示例
nodes:
- s1:
subworkflow: sw1
args:
operator: timer
name: test
input:
k1: v1
type: sub_workflow
subworkflows:
- sw1:
name: 子流程实例
desc: 子流程描述示例
nodes:
- t1:
name: 创建群聊
args:
protocol: trpc
service: trpc.dws.chatops.ChatOps
method: /helper/group/creation
body:
name: 子流程群聊示例
userlist:
- "${w.i.operator}"
type: SERVICE
ASSIGN节点
设置全局变量和业务信息的节点
字段描述
字段名 | 类型 | 描述 |
---|---|---|
assign | array | 要设置的variables/biz/owner的key和value |
示例
type: assign
name: 设置全局变量
assign:
- biz:
projectName: 流程定义示例
- owner:
wechat: replaceme
- variables:
test: hh
REF节点
引用节点,节点的定义和被引用的节点保持一致,如果引用节点配置了属性则用引用节点的配置 (如果配置冲突 取 REF 节点的配置); SERVICE节点支持引用, 非SERVICE节点不支持引用。该节点 next 为必填字段。
字段描述
字段名 | 类型 | 描述 |
---|---|---|
ref | string | 被引用的节点 |
WAIT节点
等待节点,不做任何业务功能;和节点执行前等待的区别:WAIT节点属于流程级别的等待,表达的意思是流程单纯的需要等待一段时间或者等某个事件完成后才向后调度;节点执行前等待属于节点级别的等待,表达的意思是前置的所有内容都准备好了,该节点自身需要等待某个时间或者事件才能执行。
示例
type: wait
wait:
duration: 10s
附录
表达式编写指南
请参考: https://github.com/PaesslerAG/gval
注意如果字段不存在,
${n == 0}
这个表达式返回的是true,因为go里面数字默认的值就是0;
一、内置表达式方法
不支持多个方法嵌套使用
1.1 sprintf
类比 golang 中的 fmt.sprintf() 方法,负责字符串的格式化,支持使用 /n 换行,支持基本的 markdown 语法
${sprintf("%s.%s-%s发布", w.v.app, w.v.server, w.v.cur_time)}
如果要同时使用一些特殊字符,比如 #/:
等,可以用单引号将整个表达式括起来 ${sprintf("##%s.%s-%s发布##", w.v.app, w.v.server, w.v.cur_time)}
1.2 curtimeformat
当前时间的格式化
${curtimeformat("0102")}
,输出当天的日期,比如现在是9月27日,则输出0927
分享内容