FLOWABLE的真正的并行服务任务执行(上)

FLOWABLE的真正的并行服务任务执行(上)

 

介绍

Flowable从一开始就一直支持并行执行任务,大多数时候都是使用BPMN并行网关。在CMMN中,并行执行的任务是任务执行的默认方式(如果您不熟悉CMMN,请查看我们的CMMN博客系列简介)。

您可能想知道标题中的“真正的并行执行”是什么意思。为了回答这个问题,让我们用一些非常简单的HTTP任务在BPMN中做一个小练习。在每个任务名称中,我们都编写了完成单个HTTP任务所需的时间。通过查看下面的过程,您认为接下来的过程要花多长时间?

处理并行HTTP任务

如果您不熟悉Flowable或其他类似的工作流程引擎,则可能会说大约需要800毫秒才能完成。但是,实际上,此流程实例实际上在2.6s内执行。即使我们有一个描述逻辑并行业务执行的并行网关,该技术实现实际上也会按顺序完成这项工作,一个HTTP任务一个接一个。这样做的主要原因是事务性:Flowable保证从一个等待状态原子地移动到另一个等待状态。这样做的代价是所有任务的执行都是单线程的。

与异步标志的区别

你们中的一些人会立即说:“ Flowable已经有了使任务异步化的解决方案,为什么我们需要阅读有关并行执行的新博客?”

以上面的示例为例,将Task1,Task2,Task3和Task4标记为异步将确实使所有任务的执行都是异步的(有关更多信息,请参见Demystifying the Asynchronous Flag)。但是,这样做有一些重要的警告。

现在,我们将有四个不同的线程/事务对同一个流程实例进行修改。当所有分支完成时加入时,这可能导致乐观的锁定异常(这是完全可以的,因为如果发生这种情况,Flowable将重试异步作业)。也有一个解决方案:将这些任务排除在外。但是,这意味着每个流程实例一次只能执行一个分支,以避免乐观锁异常。实际上,这将使执行并行化(甚至有可能将负载分散到不同的Flowable节点上),但是当发生冲突和发挥排他性时,它可能会使完整的端到端运行变慢。因此,异步/排他主要是在不同的Flowable节点之间分散负载的一种方法,而不是加快执行速度。当然,使用异步标志也具有其他效果,例如更快地将控制权返回给用户。但这不是本文的重点。

FUTUREJAVA代表

在某些用例中,主要焦点是原始吞吐量,通常是在服务编排中,而不是分散负载。如果我们看一下上面的示例过程模型,自然的倾向是说这应该在〜800ms内完成,因为那是执行时间最长的路径(第一和第三路径)所需的时间。其他分支速度更快,不应影响总执行时间。

现在,对于即将到来的Flowable 6.6开源版本所做的工作,现在确实可以做到这一点(如果您想通过从源代码构建引擎进行试验,代码库中已经存在该代码)。借助此功能,Flowable将允许您编写自己的业务逻辑,以便可以完全并行地执行它。为此,我们使用Java 8 CompletableFutures,并将工作的执行委托给来自不同线程池的新线程,从而允许当前线程继续执行和调度其他并行流。

为了使您能够编写真正的并行业务逻辑,我们添加了一些可以使用的新接口。主要接口是FutureJavaDelegate(用于BPMN)和PlanItemFutureJavaDelegate(用于CMMN)。我们将展示BPMN接口,但是CMMN具有相同的概念。通过实现这些接口,您的业务逻辑将自动符合真正的并行执行条件。

public interface FutureJavaDelegate
{ /** * Perform the execution of the delegate, potentially on another thread. * The result of the future is passed in the {@link #afterExecution(DelegateExecution, Object)} in order to store * the data on the execution on the same thread as the caller of this method. * * IMPORTANT: the execution should only be used to read data before creating the future. * The execution should not be used in the task that will be executed on a new thread. *

* The {@link AsyncTaskInvoker} is in order to schedule an execution on a different thread. * However, it is also possible to use a different scheduler, or return a future not created by the given {@code taskInvoker}. *

* * @param execution the execution that can be used to extract data * @param taskInvoker the task invoker that can be used to execute expensive operation on another thread * @return the output data of the execution */ CompletableFuture

execute(DelegateExecution execution, AsyncTaskInvoker taskInvoker); /** * Method invoked with the result from {@link #execute(DelegateExecution, AsyncTaskInvoker)}. * This should be used to set data on the {@link DelegateExecution}. * This is on the same thread as {@link #execute(DelegateExecution, AsyncTaskInvoker)} and participates in the process transaction. * * @param execution the execution to which data can be set * @param executionData the execution data */ void afterExecution(DelegateExecution execution, Output executionData); }

相关教程