• 请不要在回答技术问题时复制粘贴 AI 生成的内容
yueyoum
V2EX  ›  程序员

大量定时任务如何快速可靠的实现?

  •  
  •   yueyoum ·
    yueyoum · May 20, 2014 · 8476 views
    This topic created in 4399 days ago, the information mentioned may be changed or developed.
    场景类似于 EVE中的技能训练,
    玩家将要训练的技能放入 训练队列中,

    每个任务训练玩后,都会主动通知玩家,自动开始下个技能

    现在我的系统中也有类似的东西。

    玩家会开启的一个带总时间限制的功能,开启后可以随意停止。或者下线,直到达到时间限制,自动停止。

    我想的就是 每个玩家的这个操作都是一个定时任务,这个任务执行时间就是 到达时间限制的时候,功能就是停止这个玩家的这项功能。

    考察了 celery (borker用的 redis),
    但经过我的测试 它的 task.apply_async(... countdown=xxx)
    在丝毫无负责的情况下,并不精确。
    并且文档也说明了,只是保证在 countdown 之后执行, 负载 网络 等原因会导致其不保证一定准时执行。


    其实 celery 也是可用的,只是精度差了点。

    有没有高精度的实现?
    22 replies    2014-05-22 12:01:53 +08:00
    Actrace
        1
    Actrace  
       May 20, 2014
    Event Agent.
    9hills
        2
    9hills  
       May 20, 2014
    你这个与其用定时任务,不如用redis的expire key

    key过期不存在了,就是达到了时间限制。另外2.8之后Key Expire可以触发trigger,参见
    http://redis.io/topics/notifications
    yueyoum
        3
    yueyoum  
    OP
       May 20, 2014
    @Actrace

    哦? nodejs 的东西 ? 不了解啊。
    MasterYoda
        4
    MasterYoda  
       May 20, 2014
    @9hills
    赞redis的思路。
    只是当存在大量expire key的时候,redis的那个pubsub通知也不精准。
    MasterYoda
        5
    MasterYoda  
       May 20, 2014
    @9hills
    不过看他的应用应该是自己去pull状态的,不是push。所以不用那个notification也行。
    yueyoum
        6
    yueyoum  
    OP
       May 20, 2014
    @9hills

    恩,有了trigger 确实是个方法。
    没有trigger的时候 expire key 也几乎不可用。 总不能不停的去检测这些key吧

    不过 trigger 没接触过,我代会去看看……
    akira
        7
    akira  
       May 20, 2014
    自己实现吧,做个定时器,然后遍历所有的countdown,也不需要多少耗时
    MasterYoda
        8
    MasterYoda  
       May 20, 2014
    @yueyoum
    如果用户查看还有多少时间时去 ttl key 一次。 如果不去就依靠那个trigger,不精准也就无所谓了吧。
    yueyoum
        9
    yueyoum  
    OP
       May 20, 2014
    @MasterYoda

    如果是用户查看还有多少时间, 根本就不用redis,
    直接把开始时间 保存起来, 然后查看的时候 用 当时时间,起始时间,总时间 就可以算出来。

    是 PUSH, 也就是 没有任何操作, 定时器也要工作。



    @akira

    想过用 erlang 自己实现, 思路和实现都及其简单,简单到连test都不用的地步。

    erlang 提供一个 register 接口, 系统把需要定时的任务丢给erlang
    erlang spawn一个 process,然后就 sleep住, 醒来后 就带着相应的参数 去回调系统就可以。


    只是感觉大量定时任务 在服务端应该是一个极其常见的应用场景。
    所以就来问问是否有更好的实现。
    MasterYoda
        10
    MasterYoda  
       May 20, 2014
    @yueyoum
    好吧,纯Push,那么用户不去的话还有trigger啊。不过那个trigger也是不精准的。
    yueyoum
        11
    yueyoum  
    OP
       May 20, 2014
    @9hills

    @MasterYoda

    redis 我也大量运用,也算熟悉,expire key
    这个在上个项目中 做 功能 CD 是这样做的。
    这样做没问题,因为是要去 做这个功能的时候去 查一下 有没有那个key
    有就不能做。


    但现在的应用场景变了, 是key消失了 需要通知系统, 做一些动作。


    所以如果 直接撸redis,也只能考虑用它的通知机制

    redis 是单进程单线程模式, 不用测试就知道,量大以后,也会不精确。
    yueyoum
        12
    yueyoum  
    OP
       May 20, 2014
    @MasterYoda

    恩, 只是我没用过 redis trigger。 暂时还是考虑其他方式
    Actrace
        13
    Actrace  
       May 20, 2014
    @yueyoum 我的意思是说用事件驱动的方式去做这件事情会比较轻松.
    9hills
        14
    9hills  
       May 20, 2014
    @yueyoum redis trigger key expire的时候其实不会触发,只有被delete的时候才会触发,这个时间差是ms级别

    主要看用户量吧,我没任何数据瞎猜,几万Key应该用redis没问题。 再多你还是测试下吧
    yueyoum
        15
    yueyoum  
    OP
       May 20, 2014
    @9hills

    哦, 那不是 依靠 key 自己 expire 就没法用 trigger 呢?
    codingpp
        16
    codingpp  
       May 20, 2014
    import time
    import threading
    from heapq import heappush, heappop, heapify

    class TaskCall(object):
    def __init__(self, runtime, func, args):
    self.runtime = runtime
    self.func = func
    self.args = args

    def __lt__(self, other):
    return self.runtime < other.runtime

    def __le__(self, other):
    return self.runtime <= other.runtime

    def __gt__(self, other):
    return self.runtime > other.runtime

    def __ge__(self, other):
    return self.runtime >= other.runtime

    class Timer(object):
    def __init__(self):
    self.squeue = []
    self.newtasks = []

    def add(self, second, func, args = ()):
    runtime = time.time() + second
    task = TaskCall(runtime, func, args)
    self.newtasks.append(task)

    def loop(self):
    while(True):
    while len(self.newtasks) != 0:
    heappush(self.squeue, self.newtasks.pop())
    while self.squeue and (self.squeue[0].runtime <= time.time()):
    task = heappop(self.squeue)
    apply(task.func, task.args)
    time.sleep(0.1)

    timer = Timer()

    if __name__ == '__main__':
    def printsome(i):
    print i
    def exitprocess(i):
    exit()

    timer.add(2, printsome, (2,))
    timer.add(5, printsome, (5,))
    timer.add(5, printsome, (5,))
    timer.add(5, printsome, (5,))
    timer.add(5, printsome, (5,))
    timer.add(1, printsome, (1,))
    timer.add(7, exitprocess, (1,))

    timer.loop()


    之前写过的一段代码,应该很适合这种场景
    堆排序
    Livid
        17
    Livid  
    MOD
    PRO
       May 20, 2014
    如果是这样做呢?

    - 存开始时间和预计的结束时间,用预计的结束时间做索引
    - 每分钟检查有哪些任务已经 expire(当前时间 > 预计的结束时间),如果 expire 就 trigger
    MasterYoda
        18
    MasterYoda  
       May 20, 2014
    @yueyoum
    想用的话,删除时 expire key 0。
    yueyoum
        19
    yueyoum  
    OP
       May 21, 2014
    @codingpp
    @Livid

    感谢提供思路, 确实可以。
    但只是感觉不停的loop检测,不是个好办法啊
    codingpp
        20
    codingpp  
       May 21, 2014
    @yueyoum
    其实在loop里加一个time.sleep(0.0001),空循环运行一段时间机器的负载几乎是0的
    mx1700
        21
    mx1700  
       May 21, 2014
    @yueyoum 不用不停的loop,获取最近的一条任务,计算距离当前的时间差,启动一个计时器到时间再触发任务执行,然后再启动计时器。
    如果有新任务插入,就取消当前计时器,重新计时。
    yueyoum
        22
    yueyoum  
    OP
       May 22, 2014
    @mx1700 是的 我的思路也是这样, 并且用celery实现了
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   1124 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 53ms · UTC 23:43 · PVG 07:43 · LAX 16:43 · JFK 19:43
    ♥ Do have faith in what you're doing.