搜索

一个可靠的原创定时任务解决方案

gecimao 发表于 2019-06-11 19:07 | 查看: | 回复:

  个人介绍:王铭鑫,2017 年 7 月入职 Qunar,现任国际机票基础搜索部门 Java 开发工程师,熟悉并发编程和多态编程,对架构设计、性能调优略有涉猎。工作之余,参与修订 C++ 国际标准,发表过多篇 ISO C++ 提案,涉及并发编程和多态编程领域。其中一篇关于多态编程的提案被 C++ 创始人 Bjarne Stroustrup 评价为“has the potential for significantly change the way we write code”,目前正在持续推进其标准化进程。

  在复杂度较高的大规模集群中,定时任务是一种常见的需求;在 Java 语言中,我们通常使用内置的 ScheduledThreadPoolExecutor 类来实现定时任务需求,也可以使用 Quartz 等第三方辅助工具。然而,我们发现在复杂的并发环境中灵活控制定时任务并不容易。

  举个例子,假设我们有一个配置中心,提供一个读取配置的 RPC 接口如下:

  该接口返回的数据中可能同时存在某些很重要的数据和一些不那么重要的数据。根据需求,客户端程序需要调用该接口以使用远程的配置数据;但由于客户端集群庞大,使用该配置的频率远超过该接口可承受 QPS。所幸,配置中心中的重要数据更新不那么频繁,对于不那么重要的部分我们也不需要远程的实时数据,所以可以使用周期性定时任务的方式内拉取最新配置,例如:

  对于某些重要的数据更新,我们可能更希望尽快同步到每一个客户端,这就需要依赖于分布式消息机制。当然,我们可以将配置作为消息体发送给各个客户端;但当配置数据量较大时,为了降低分布式消息维护开销,我们通常更倾向于让客户端重新拉取。那么,客户端接收到“立即更新”的消息后,应执行怎样的逻辑呢?在上面代码的基础上,可能写出这样的代码:

  乍看上去,这段代码相对还算优雅,如果发布上线也基本可以正常工作;但在某些情况下,这段代码是不可靠的。比如,当接收到消息时,一次更新操作正在进行,此时启动了另一个定时任务就可能出现并发问题——假如 remoteConfigLoader 中保存了一些可变状态(例如一个调用次数计数器或其他数据),或者正在进行的更新操作在第一次新任务完成之后才结束,都可能导致程序运行异常,如下图所示:

  另外,假设有多个消息同时到达(可能由于在短时间内更新了多次重要配置),任务的取消和创建也可能会有并发问题,甚至导致线程泄漏和内存泄漏,如下图所示:

  那么上述这些问题应该如何避免呢?如果仅使用 ScheduledThreadPoolExecutor 似乎是很难做到的,我们通常需要增加一些并发控制逻辑,例如在上述两个可能产生并发问题的地方分别加锁:

  可以看出,加入两个互斥量之后,代码逻辑比之前复杂很多,而且如果对于互斥量使用不当还可能导致死锁等问题,这样既不利于测试和维护,甚至可能产生性能瓶颈。注意,如果这里将两个互斥量合并为一个也可能引入并发问题——当定时任务正在执行时,任何触发操作将都被阻塞,从而降低系统吞吐率;假如有两个定时任务同时相互触发,则必然会导致死锁。如果我们借助第三方工具,例如 Quartz,是可以避免这类问题的,但仍需要我们仔细分析并发程序行为来做相应的配置,这里不展开叙述。

  进一步,假如我们要新上线一个包含定时任务的功能,为了尽可能不影响线上之前的业务,我们希望为这个定时任务加入一个开关,在开关关闭时不运行定时任务,打开时才运行;如果开关开启后新的代码出现了问题,关闭开关后应可以及时停止定时任务;另外,我们在开关开启后希望根据实际的运行效果来调整定时任务的时间间隔。如果我们基于上面的代码继续开发,则逻辑将会更加复杂:

  再举一个简单的例子,假如我们需要开发一个模拟用户操作的自动化测试工具,由于用户点击周期通常不固定,而是在某个区间内类似正态分布,这就要求任意两次定时任务的间隔需要通过更复杂的逻辑计算出来。

  后文将为大家介绍一个原创的定时任务辅助工具,叫做 Circulation Trigger。在正式介绍其具体设计与实现之前,我们首先看一下如何用这个工具解决上面的那些问题。

  当客户端接收到“立即更新”的消息时,只需要调用一次 trigger.fire() 即可(如果需要延迟触发可在 fire 方法中传入一个代表时长的 Duration 对象),然后 Circulation Trigger 就会帮助你自动取消之前的定时计划并尽快执行一次定时任务而不引入任何并发问题。

  Circulation Trigger 保证多次触发的定时任务一定不会并发执行:假如在消息到来时正在执行一次定时任务,那么 Circulation Trigger 将会在该次定时任务结束后立即再执行一次,这样也保证加载的数据一定是最新的。假如有多个消息同时到达,并发调用 fire 方法时,只会有一次生效,而不会引起任何内存泄漏或线程泄漏的问题。

  进一步,假如我们要新上线一个包含定时任务的功能,我们可以随时调用 trigger.fire() 和 trigger.suspend() 触发或终止定时任务;同样地,Circulation Trigger 保证不引入任何并发问题。

  另外,即使定时任务触发的周期不固定,Circulation Trigger 也提供了足够的扩展性——任何一次触发的周期都由用户指定,例如所述以类似正态分布的方式,周期性模拟用户的点击过程:

  设计这个工具之前我阅读过一些业务中实现定时任务的程序,但其中大多数都或多或少存在潜在的并发问题——不仅这些 bug 难以复现,而且相关并发控制代码可复用性不高。所以,这个工具设计的首要目标就是彻底杜绝并发问题,除此之外我还有一些其他的考量。

  这个目标说起来容易,但在实现的时候需要考虑很多边界情况。举个例子,假如我们顺序地做两次触发,第一次在 5 分钟后执行定时任务,第二次在 1 分钟后执行定时任务,那么通常情况下我们更希望以第二次为准;进一步,在一次定时任务运行时,并发地进行多次不同延时的触发操作,下一次任务应该在什么时候运行呢?根据需求调研,我们规定,当某次触发的任务尚未运行时,之后的触发信号应覆盖该信号,这样既可以避免不必要的触发,又保证了最新的有效触发一定成功。

  细心的同学应该会发现,构造 Circulation Trigger 时传入的 lambda 表达式的返回值类型是 Optional 。当其有值时,其语义是下一次运行定时任务的时间间隔;反之,当其没有值时则不进行下一次定时任务。这样设计 API 是为了让用户可以优雅地主动终止定时任务。

  ScheduledThreadPoolExecutor 的 API 支持两种提交周期性任务方式,一种是按固定频率调度(对应 scheduleAtFixedRate 方法),另一种是按固定间隔时间调度(对应 scheduleWithFixedDelay 方法);当然,我们还可以选择按精确时刻调度,这样似乎更符合“定时任务”的定义。虽然这些表示方法都可以相互无损转化,但是经过调研,我们发现周期性定时任务通常更关注时间间隔,所以我们选择按照间隔时间调度的方式。

  Java 8 之后已经有了标准的时间库,例如我们可以使用 Instant 表示一个精确时刻,使用 Duration 和 Period 表示一个时间段。由于 Duration 提供了更精确的时间表示方法,为了此工具更加通用,我们在设计 API 是选择了 Duration。

  为了让此工具更易使用,我们选取 Java 自带的 ScheduledExecutorService 接口作为本工具的底层组件。事实上,由于 ScheduledExecutorService 本身不支持定时任务的自提交以减少临界区的竞争,对于此工具而言仍存在性能可提升空间;但由于该开销不会产生性能瓶颈,为了降低学习成本,我们没有加入其它提升扩展性的中间层。但是为了充分测试此工具,我们也提供了一个相应的 C++ 实现,其线程池被设计为支持定时任务的自提交的运行模式;代码已经过充分测试,欢迎大家使用。

  在 API 设计上,我们考虑过使用继承的方式,即提供一个抽象类,包含一个承载定时任务逻辑的抽象方法。经分析,我们认为使用 Java 实现此机制时,两种方法的学习和使用成本相当;但在其他语言中,例如 C++,使用组合要明显优于继承的方式。为了增加可移植性,我们最终选择了组合而非继承的方式。

  异常是 Java 的核心语言机制,理论上任何方法的调用都可能抛出异常。然而,由于定时任务在线程池中异步运行,按照串行的方式捕获异常是不可行的。我们规定,当一次定时任务的运行抛出异常时,该异常应湮没以不影响其他运行在线程池中的定时任务运行;所以用户应尽量避免在执行定时任务时抛出异常。如果用户希望终止后续的定时任务,应显示返回有明确语义的 Optional.empty()。

  在实现一个并发机制之前,我们通常都会先提取其在不同上下文中的状态。对于这个工具来说,显然至少需要三个维度的状态:

  对于任务标识,我们可以直接使用 Java 的引用,但这样就不可避免使用互斥量或者 Immutable 模式,开销较大;如果我们采用版本号的方式,就可以使用无锁的方式实现这个算法。由于触发定时任务的操作通常不会特别频繁,我们认为使用一个 32 位整数(后文称之为“状态”)来存储这三个维度的信息就足够了;具体地,1 和 2 可以分别使用一个比特位,剩余 30 位存储版本号,空间为 0x00000000~0x3FFFFFFF,可循环使用。为了便于叙述,我们后文将上述三个维度分别称为:预约位、运行位和最新版本号,依次排列,如下图所示:

  此过程较为简单,如果存在预约则清除预约位,然后将版本号循环加一即可。注意,版本号的最大值为 0x3FFFFFFF,加一之后应该变为 0x00000000。伪代码如下:

  此过程有两个独立步骤,首先检查版本号是否是最新的,然后设置运行位。令该过程输出“该次定时任务是否执行”,伪代码如下:

  此过程需检查是否存在新版本定时任务预约。令此过程输出“{是否继续运行, 新版本号}”,伪代码如下:

  以上代码可以使用 Java 8 及以上版本编译。事实上,在 Java 9 之后,Java 对原子操作的 API 做了一些升级,允许用户指定原子操作的内存顺序(Memory Order)。上述代码中所使用的 weakCompareAndSet 方法已被弃用,但为了兼容 Java 8 的程序,我给出的实现中并没有使用 Java 9 之后的的 API。理论上,上述程序中只需要指定两个操作的内存顺序即可,不需要保持所有操作的串行一致性,感兴趣的同学可以参照我们给出的此工具 C++ 实现(),该实现中已经使用了最优的内存同步配置。

本文链接:http://kingstonflowers.net/dingshirenwu/592.html
随机为您推荐歌词

联系我们 | 关于我们 | 网友投稿 | 版权声明 | 广告服务 | 站点统计 | 网站地图

版权声明:本站资源均来自互联网,如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

Copyright @ 2012-2013 织梦猫 版权所有  Powered by Dedecms 5.7
渝ICP备10013703号  

回顶部