V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
yueyoum
V2EX  ›  程序员

大量定时任务如何快速可靠的实现?

  •  
  •   yueyoum ·
    yueyoum · 2014-05-20 14:37:31 +08:00 · 7738 次点击
    这是一个创建于 3629 天前的主题,其中的信息可能已经有所发展或是发生改变。
    场景类似于 EVE中的技能训练,
    玩家将要训练的技能放入 训练队列中,

    每个任务训练玩后,都会主动通知玩家,自动开始下个技能

    现在我的系统中也有类似的东西。

    玩家会开启的一个带总时间限制的功能,开启后可以随意停止。或者下线,直到达到时间限制,自动停止。

    我想的就是 每个玩家的这个操作都是一个定时任务,这个任务执行时间就是 到达时间限制的时候,功能就是停止这个玩家的这项功能。

    考察了 celery (borker用的 redis),
    但经过我的测试 它的 task.apply_async(... countdown=xxx)
    在丝毫无负责的情况下,并不精确。
    并且文档也说明了,只是保证在 countdown 之后执行, 负载 网络 等原因会导致其不保证一定准时执行。


    其实 celery 也是可用的,只是精度差了点。

    有没有高精度的实现?
    22 条回复    2014-05-22 12:01:53 +08:00
    Actrace
        1
    Actrace  
       2014-05-20 14:44:32 +08:00
    Event Agent.
    9hills
        2
    9hills  
       2014-05-20 15:10:33 +08:00
    你这个与其用定时任务,不如用redis的expire key

    key过期不存在了,就是达到了时间限制。另外2.8之后Key Expire可以触发trigger,参见
    http://redis.io/topics/notifications
    yueyoum
        3
    yueyoum  
    OP
       2014-05-20 15:57:34 +08:00
    @Actrace

    哦? nodejs 的东西 ? 不了解啊。
    MasterYoda
        4
    MasterYoda  
       2014-05-20 15:57:37 +08:00
    @9hills
    赞redis的思路。
    只是当存在大量expire key的时候,redis的那个pubsub通知也不精准。
    MasterYoda
        5
    MasterYoda  
       2014-05-20 15:58:35 +08:00
    @9hills
    不过看他的应用应该是自己去pull状态的,不是push。所以不用那个notification也行。
    yueyoum
        6
    yueyoum  
    OP
       2014-05-20 15:58:38 +08:00
    @9hills

    恩,有了trigger 确实是个方法。
    没有trigger的时候 expire key 也几乎不可用。 总不能不停的去检测这些key吧

    不过 trigger 没接触过,我代会去看看……
    akira
        7
    akira  
       2014-05-20 16:05:53 +08:00
    自己实现吧,做个定时器,然后遍历所有的countdown,也不需要多少耗时
    MasterYoda
        8
    MasterYoda  
       2014-05-20 16:07:11 +08:00
    @yueyoum
    如果用户查看还有多少时间时去 ttl key 一次。 如果不去就依靠那个trigger,不精准也就无所谓了吧。
    yueyoum
        9
    yueyoum  
    OP
       2014-05-20 16:18:27 +08:00
    @MasterYoda

    如果是用户查看还有多少时间, 根本就不用redis,
    直接把开始时间 保存起来, 然后查看的时候 用 当时时间,起始时间,总时间 就可以算出来。

    是 PUSH, 也就是 没有任何操作, 定时器也要工作。



    @akira

    想过用 erlang 自己实现, 思路和实现都及其简单,简单到连test都不用的地步。

    erlang 提供一个 register 接口, 系统把需要定时的任务丢给erlang
    erlang spawn一个 process,然后就 sleep住, 醒来后 就带着相应的参数 去回调系统就可以。


    只是感觉大量定时任务 在服务端应该是一个极其常见的应用场景。
    所以就来问问是否有更好的实现。
    MasterYoda
        10
    MasterYoda  
       2014-05-20 16:21:02 +08:00
    @yueyoum
    好吧,纯Push,那么用户不去的话还有trigger啊。不过那个trigger也是不精准的。
    yueyoum
        11
    yueyoum  
    OP
       2014-05-20 16:22:32 +08:00
    @9hills

    @MasterYoda

    redis 我也大量运用,也算熟悉,expire key
    这个在上个项目中 做 功能 CD 是这样做的。
    这样做没问题,因为是要去 做这个功能的时候去 查一下 有没有那个key
    有就不能做。


    但现在的应用场景变了, 是key消失了 需要通知系统, 做一些动作。


    所以如果 直接撸redis,也只能考虑用它的通知机制

    redis 是单进程单线程模式, 不用测试就知道,量大以后,也会不精确。
    yueyoum
        12
    yueyoum  
    OP
       2014-05-20 16:23:22 +08:00
    @MasterYoda

    恩, 只是我没用过 redis trigger。 暂时还是考虑其他方式
    Actrace
        13
    Actrace  
       2014-05-20 16:28:02 +08:00
    @yueyoum 我的意思是说用事件驱动的方式去做这件事情会比较轻松.
    9hills
        14
    9hills  
       2014-05-20 18:09:36 +08:00
    @yueyoum redis trigger key expire的时候其实不会触发,只有被delete的时候才会触发,这个时间差是ms级别

    主要看用户量吧,我没任何数据瞎猜,几万Key应该用redis没问题。 再多你还是测试下吧
    yueyoum
        15
    yueyoum  
    OP
       2014-05-20 18:24:05 +08:00
    @9hills

    哦, 那不是 依靠 key 自己 expire 就没法用 trigger 呢?
    codingpp
        16
    codingpp  
       2014-05-20 18:54:15 +08:00
    import time
    import threading
    from heapq import heappush, heappop, heapify

    class TaskCall(object):
    def __init__(self, runtime, func, args):
    self.runtime = runtime
    self.func = func
    self.args = args

    def __lt__(self, other):
    return self.runtime < other.runtime

    def __le__(self, other):
    return self.runtime <= other.runtime

    def __gt__(self, other):
    return self.runtime > other.runtime

    def __ge__(self, other):
    return self.runtime >= other.runtime

    class Timer(object):
    def __init__(self):
    self.squeue = []
    self.newtasks = []

    def add(self, second, func, args = ()):
    runtime = time.time() + second
    task = TaskCall(runtime, func, args)
    self.newtasks.append(task)

    def loop(self):
    while(True):
    while len(self.newtasks) != 0:
    heappush(self.squeue, self.newtasks.pop())
    while self.squeue and (self.squeue[0].runtime <= time.time()):
    task = heappop(self.squeue)
    apply(task.func, task.args)
    time.sleep(0.1)

    timer = Timer()

    if __name__ == '__main__':
    def printsome(i):
    print i
    def exitprocess(i):
    exit()

    timer.add(2, printsome, (2,))
    timer.add(5, printsome, (5,))
    timer.add(5, printsome, (5,))
    timer.add(5, printsome, (5,))
    timer.add(5, printsome, (5,))
    timer.add(1, printsome, (1,))
    timer.add(7, exitprocess, (1,))

    timer.loop()


    之前写过的一段代码,应该很适合这种场景
    堆排序
    Livid
        17
    Livid  
    MOD
       2014-05-20 19:01:01 +08:00
    如果是这样做呢?

    - 存开始时间和预计的结束时间,用预计的结束时间做索引
    - 每分钟检查有哪些任务已经 expire(当前时间 > 预计的结束时间),如果 expire 就 trigger
    MasterYoda
        18
    MasterYoda  
       2014-05-20 19:02:29 +08:00
    @yueyoum
    想用的话,删除时 expire key 0。
    yueyoum
        19
    yueyoum  
    OP
       2014-05-21 10:02:17 +08:00
    @codingpp
    @Livid

    感谢提供思路, 确实可以。
    但只是感觉不停的loop检测,不是个好办法啊
    codingpp
        20
    codingpp  
       2014-05-21 19:03:22 +08:00
    @yueyoum
    其实在loop里加一个time.sleep(0.0001),空循环运行一段时间机器的负载几乎是0的
    mx1700
        21
    mx1700  
       2014-05-21 23:37:30 +08:00
    @yueyoum 不用不停的loop,获取最近的一条任务,计算距离当前的时间差,启动一个计时器到时间再触发任务执行,然后再启动计时器。
    如果有新任务插入,就取消当前计时器,重新计时。
    yueyoum
        22
    yueyoum  
    OP
       2014-05-22 12:01:53 +08:00
    @mx1700 是的 我的思路也是这样, 并且用celery实现了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2519 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 15:53 · PVG 23:53 · LAX 08:53 · JFK 11:53
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.