前言
本文基于 Activiti 7.0.0.GA 源码,研究 Activiti 如何启动一个流程实例。
审批流程图
如下图,在此流程图中,存在两个UserTask节点,第一个节点是主管审批,第二个节点是产品经理审批,两个节点中间有一个排他网关,此网关用来对主管审批的结果进行判断,如果主管审批通过,则流程走到产品经理审批节点,如果主管审批拒绝,则流程走到结束节点。
主管审批节点通过UEL表达式${assignManager}动态赋值,产品经理审批节点通过UEL表达式${assignProductLineManager}动态赋值,网关节点通过UEL表达式${isPass}动态赋值。
启动流程实例
以RuntimeService接口的startProcessInstanceById(String, Map)为源码入口,研究Activiti框架如何启动一个流程实例。
ProcessInstance startProcessInstanceById(String processDefinitionId, Map<String, Object> variables)
RuntimeService接口的实现类RuntimeServiceImpl实现如下,把传入的processDefinitionId和variable封装成StartProcessInstanceCmd,然后把StartProcessInstanceCmd放入commandExecutor命令执行器的execute方法中,待后续调用。
public ProcessInstance startProcessInstanceById(String processDefinitionId, Map<String, Object> variables) {return commandExecutor.execute(new StartProcessInstanceCmd<ProcessInstance>(null, processDefinitionId, null, variables));
}
CommandExecutor内部持有一个命令拦截器LogInterceptor,LogInterceptor是CommandInteceptor命令拦截器链中的第一个拦截器,StartProcessInstanceCmd在执行前会经过拦截器链中的所有拦截器。
拦截器链中的各个拦截器的前后顺序如下,CommandInvoker是拦截器链中的处于最末端的拦截器,专用于调用实现了Command接口的各种CMD类。
StartProcessInstanceCmd实现了Command接口,该接口只有一个execute方法,此方法会被CommandInvoker拦截器调用,从而开始执行StartProcessInstanceCmd实现的execute方法,用以启动一个流程实例。
@Internal
public interface Command<T> {T execute(CommandContext commandContext);
}
StartProcessInstanceCmd实现execute方法如下:
package org.activiti.engine.impl.cmd;import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;import org.activiti.bpmn.model.ValuedDataObject;
import org.activiti.engine.ActivitiIllegalArgumentException;
import org.activiti.engine.ActivitiObjectNotFoundException;
import org.activiti.engine.ProcessEngineConfiguration;
import org.activiti.engine.impl.interceptor.Command;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.persistence.deploy.DeploymentManager;
import org.activiti.engine.impl.runtime.ProcessInstanceBuilderImpl;
import org.activiti.engine.impl.util.ProcessInstanceHelper;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.ProcessInstance;/*** 开启流程实例-命令类*/
public class StartProcessInstanceCmd<T> implements Command<ProcessInstance>, Serializable {private static final long serialVersionUID = 1L;protected String processDefinitionKey;protected String processDefinitionId;protected Map<String, Object> variables;protected Map<String, Object> transientVariables;protected String businessKey;protected String tenantId;protected String processInstanceName;protected ProcessInstanceHelper processInstanceHelper;public StartProcessInstanceCmd(String processDefinitionKey, String processDefinitionId, String businessKey, Map<String, Object> variables) {this.processDefinitionKey = processDefinitionKey;this.processDefinitionId = processDefinitionId;this.businessKey = businessKey;this.variables = variables;}public StartProcessInstanceCmd(String processDefinitionKey, String processDefinitionId, String businessKey, Map<String, Object> variables, String tenantId) {this(processDefinitionKey, processDefinitionId, businessKey, variables);this.tenantId = tenantId;}public StartProcessInstanceCmd(ProcessInstanceBuilderImpl processInstanceBuilder) {this(processInstanceBuilder.getProcessDefinitionKey(), processInstanceBuilder.getProcessDefinitionId(), processInstanceBuilder.getBusinessKey(), processInstanceBuilder.getVariables(), processInstanceBuilder.getTenantId());this.processInstanceName = processInstanceBuilder.getProcessInstanceName();this.transientVariables = processInstanceBuilder.getTransientVariables();}/*** 从 CommandInvoker 调用下面这个方法* @param commandContext* @return*/public ProcessInstance execute(CommandContext commandContext) {// 取出已经部署好的流程信息DeploymentManager deploymentCache = commandContext.getProcessEngineConfiguration().getDeploymentManager();// Find the process definitionProcessDefinition processDefinition = null;// 传入了processDefinitionId,所以进入到这个if中,if (processDefinitionId != null) {// 通过 processDefinitionId 从缓存中找到 processDefinition 流程定义processDefinition = deploymentCache.findDeployedProcessDefinitionById(processDefinitionId);if (processDefinition == null) {throw new ActivitiObjectNotFoundException("No process definition found for id = '" + processDefinitionId + "'", ProcessDefinition.class);}} else if (processDefinitionKey != null && (tenantId == null || ProcessEngineConfiguration.NO_TENANT_ID.equals(tenantId))) {processDefinition = deploymentCache.findDeployedLatestProcessDefinitionByKey(processDefinitionKey);if (processDefinition == null) {throw new ActivitiObjectNotFoundException("No process definition found for key '" + processDefinitionKey + "'", ProcessDefinition.class);}} else if (processDefinitionKey != null && tenantId != null && !ProcessEngineConfiguration.NO_TENANT_ID.equals(tenantId)) {processDefinition = deploymentCache.findDeployedLatestProcessDefinitionByKeyAndTenantId(processDefinitionKey, tenantId);if (processDefinition == null) {throw new ActivitiObjectNotFoundException("No process definition found for key '" + processDefinitionKey + "' for tenant identifier " + tenantId, ProcessDefinition.class);}} else {throw new ActivitiIllegalArgumentException("processDefinitionKey and processDefinitionId are null");}// 获取一个 ProcessInstanceHelper,用于启动流程processInstanceHelper = commandContext.getProcessEngineConfiguration().getProcessInstanceHelper();ProcessInstance processInstance = createAndStartProcessInstance(processDefinition, businessKey, processInstanceName, variables, transientVariables);return processInstance;}protected ProcessInstance createAndStartProcessInstance(ProcessDefinition processDefinition, String businessKey, String processInstanceName, Map<String,Object> variables, Map<String, Object> transientVariables) {return processInstanceHelper.createAndStartProcessInstance(processDefinition, businessKey, processInstanceName, variables, transientVariables);}protected Map<String, Object> processDataObjects(Collection<ValuedDataObject> dataObjects) {Map<String, Object> variablesMap = new HashMap<String, Object>();// convert data objects to process variablesif (dataObjects != null) {for (ValuedDataObject dataObject : dataObjects) {variablesMap.put(dataObject.getName(), dataObject.getValue());}}return variablesMap;}
}
ProcessInstanceHelper的createAndStartProcessInstance中用 processDefinition 的 id 查找出 Process 流程模型。
protected ProcessInstance createAndStartProcessInstance(ProcessDefinition processDefinition,String businessKey, String processInstanceName,Map<String, Object> variables, Map<String, Object> transientVariables, boolean startProcessInstance) {CommandContext commandContext = Context.getCommandContext(); // Todo: ideally, context should be passed here// Do not start process a process instance if the process definition is suspendedif (ProcessDefinitionUtil.isProcessDefinitionSuspended(processDefinition.getId())) {throw new ActivitiException("Cannot start process instance. Process definition " + processDefinition.getName() + " (id = " + processDefinition.getId() + ") is suspended");}// Get model from cache// 用 processDefinition 的 id 查找出 Process 流程模型Process process = ProcessDefinitionUtil.getProcess(processDefinition.getId());if (process == null) {throw new ActivitiException("Cannot start process instance. Process model " + processDefinition.getName() + " (id = " + processDefinition.getId() + ") could not be found");}// 获取流程定义中的第一个节点(初始节点)FlowElement initialFlowElement = process.getInitialFlowElement();if (initialFlowElement == null) {throw new ActivitiException("No start element found for process definition " + processDefinition.getId());}return createAndStartProcessInstanceWithInitialFlowElement(processDefinition, businessKey,processInstanceName, initialFlowElement, process, variables, transientVariables, startProcessInstance);
}
在ProcessInstanceHelper的createAndStartProcessInstanceWithInitialFlowElement方法中,创建了 ExecutionEntity 流程实例,此实例的生命周期贯穿整个流程实例,从StartEvent开始,到EndEvent结束。Process流程模型就好比一条人工运河,而ExecutionEntity就是这条河流上的一条船。
public ProcessInstance createAndStartProcessInstanceWithInitialFlowElement(ProcessDefinition processDefinition,String businessKey, String processInstanceName, FlowElement initialFlowElement,Process process, Map<String, Object> variables, Map<String, Object> transientVariables, boolean startProcessInstance) {CommandContext commandContext = Context.getCommandContext();// Create the process instanceString initiatorVariableName = null;// 如果初始节点是StartEvent,就获取 Initiatorif (initialFlowElement instanceof StartEvent) {initiatorVariableName = ((StartEvent) initialFlowElement).getInitiator();}// 创建 ExecutionEntity,后续会有很多地方用到。ExecutionEntity processInstance = commandContext.getExecutionEntityManager().createProcessInstanceExecution(processDefinition, businessKey, processDefinition.getTenantId(), initiatorVariableName);// 创建完成,记录流程已经开始commandContext.getHistoryManager().recordProcessInstanceStart(processInstance, initialFlowElement);processInstance.setVariables(processDataObjects(process.getDataObjects()));// Set the variables passed into the start commandif (variables != null) {for (String varName : variables.keySet()) {// 把传入的流程变量绑定到 ProcessInstance 上processInstance.setVariable(varName, variables.get(varName));}}if (transientVariables != null) {for (String varName : transientVariables.keySet()) {processInstance.setTransientVariable(varName, transientVariables.get(varName));}}// Set processInstance nameif (processInstanceName != null) {processInstance.setName(processInstanceName);commandContext.getHistoryManager().recordProcessInstanceNameChange(processInstance.getId(), processInstanceName);}// Fire eventsif (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED, processInstance, variables, false));}// Create the first execution that will visit all the process definition elements// 用父级ExecutionEntity(processInstance)创建一个子级ExecutionEntity// 后续父级ExecutionEntity(processInstance)通过getExecutions().get(0)访问ExecutionEntity execution = commandContext.getExecutionEntityManager().createChildExecution(processInstance);// 这个 setCurrentFlowElement 操作非常重要,它表示 execution 当前活跃在哪个 FlowNode 或者 SequenceFlow 上。// execution 每到一个新的节点上,都会调用这个方法更新一下活跃的节点,在这里这个 initialFlowElement 是 StartEventexecution.setCurrentFlowElement(initialFlowElement);if (startProcessInstance) {// 已经成功启动一个新流程,从这里流程将流向下个节点startProcessInstance(processInstance, commandContext, variables);}return processInstance;
}
ProcessInstanceHelper的startProcessInstance方法中有关于如何处理Process流程模型中含有子流程的逻辑,但这里为方便学习不做研究,知道即可。在此方法中,流程实例将通过commandContext.getAgenda().planContinueProcessOperation(execution);前往下一个节点。
public void startProcessInstance(ExecutionEntity processInstance, CommandContext commandContext, Map<String, Object> variables) {// 通过 processDefinitionId 获取流程详情 ProcessProcess process = ProcessDefinitionUtil.getProcess(processInstance.getProcessDefinitionId());// Event sub process handlingList<MessageEventSubscriptionEntity> messageEventSubscriptions = new LinkedList<>();for (FlowElement flowElement : process.getFlowElements()) {// 处理流程中含有子流程的审批流if (flowElement instanceof EventSubProcess) {EventSubProcess eventSubProcess = (EventSubProcess) flowElement;for (FlowElement subElement : eventSubProcess.getFlowElements()) {if (subElement instanceof StartEvent) {StartEvent startEvent = (StartEvent) subElement;if (CollectionUtil.isNotEmpty(startEvent.getEventDefinitions())) {EventDefinition eventDefinition = startEvent.getEventDefinitions().get(0);if (eventDefinition instanceof MessageEventDefinition) {MessageEventDefinition messageEventDefinition = (MessageEventDefinition) eventDefinition;BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(processInstance.getProcessDefinitionId());if (bpmnModel.containsMessageId(messageEventDefinition.getMessageRef())) {messageEventDefinition.setMessageRef(bpmnModel.getMessage(messageEventDefinition.getMessageRef()).getName());}ExecutionEntity messageExecution = commandContext.getExecutionEntityManager().createChildExecution(processInstance);messageExecution.setCurrentFlowElement(startEvent);messageExecution.setEventScope(true);messageEventSubscriptions.add(commandContext.getEventSubscriptionEntityManager().insertMessageEvent(messageEventDefinition.getMessageRef(), messageExecution));}}}}}}// 取出父级ExecutionEntity(processInstance)的子executionExecutionEntity execution = processInstance.getExecutions().get(0); // There will always be one child execution created// 将通过 planContinueProcessOperation 继续走向流程中的下一个节点。commandContext.getAgenda().planContinueProcessOperation(execution);// 分发事件if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {ActivitiEventDispatcher eventDispatcher = Context.getProcessEngineConfiguration().getEventDispatcher();// 创建并分发流程开始事件eventDispatcher.dispatchEvent(ActivitiEventBuilder.createProcessStartedEvent(execution, variables, false));for (MessageEventSubscriptionEntity messageEventSubscription : messageEventSubscriptions) {commandContext.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createMessageEvent(ActivitiEventType.ACTIVITY_MESSAGE_WAITING, messageEventSubscription.getActivityId(),messageEventSubscription.getEventName(), null, messageEventSubscription.getExecution().getId(),messageEventSubscription.getProcessInstanceId(), messageEventSubscription.getProcessDefinitionId()));}}
}
至此,流程实例启动完成。
后续将通过 commandContext.getAgenda().planContinueProcessOperation(execution) 流转到下一个节点。
总结
流程启动过程中主要涉及到命令执行器(CommandExecutor)、命令拦截器(CommandInteceptor)、流程开始命令(StartProcessInstanceCmd)、流程定义(ProcessDefinition)、流程模型(Process)以及流程实例(ProcessInstance,即ExecutionEntity),这些元素共构成了启动一个流程实例的必需条件。
在启动流程实例过程的源码中,我个人感觉非常精妙的设计之处在于拦截器,得益于拦截器的使用,Activiti抽出了业务中的公共代码(比如内置的日志记录、CommandContext、Transaction、CommandInvoker),使得主业务与附属业务边界感更强,启动过程逐渐递进,前后逻辑顺序清晰明了,源码学习者更容易掌握业务逻辑。