首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX  ›  PHP

redis 队列推送消息的疑问🤔️

  •  
  •   uoddsa · 164 天前 · 3152 次点击
    这是一个创建于 164 天前的主题,其中的信息可能已经有所发展或是发生改变。

    有个需求,需要对系统的用户进行全局推送,包含定时推送。 因为怕重复推送了所以打算是待推送的任务发在 redis list 里面。定时任务去队列取未推送的任务。可是每次取多少,怎么取。 还是我要用发布订阅。

    第 1 条附言  ·  164 天前
    问题 ,怎么取队列里面的消息 while(true) ?还是有其他好的方法。
    29 回复  |  直到 2019-05-10 10:52:47 +08:00
        1
    coffeSlider   164 天前
    取多少,怎么取不看你自己的业务吗?如果 job 是单线程,取数据就直接用 rpoplpush。
        2
    rpdict   164 天前
    用的 redis 的 zset,按时间排序的有序集合,取的时候就取 0~当前时间戳对应的值就行了
        3
    yxjn   164 天前
    redis 队列的缺点是需要自己写很多的异常补偿机制。毕竟 redis 本身不是作为一个完整的队列功能而存在的。
        4
    julyclyde   164 天前
    @rpdict zset 很容易慢的
        5
    iyaozhen   164 天前 via Android
    redis 的 pub/sub 严格说不是个队列,是广播,无法并发消费

    用 list 就行,左进右出,一条条处理呗,可以多进程。还可以再建个重发队列,失败的丢进去。
        6
    reus   164 天前
    如果 redis 崩了呢?你怎么知道那些发过哪些没发过?
        7
    rpdict   164 天前
    @julyclyde 有序集合的好处就是读取的时候读第一个,如果时间没到就不用接着读了,节约的时间在这里
        8
    julyclyde   164 天前
    @rpdict 我是指 zset 的排序速度慢。尤其是没按顺序插入的时候
        9
    uoddsa   164 天前
    @iyaozhen 问题就在这里 怎么取队列里面的消息 while(true) ?还是有其他好的方法。
        10
    yxjn   164 天前
    @uoddsa blpop 看看能不能满足你的需求
        11
    lestat   164 天前
    @rpdict laravel 里面的延时队列好像就是这么设计的
        12
    rpdict   164 天前
    @uoddsa 是一个取舍,就好像数据库加了索引插入数据就会慢一些,但是读取会快很多,看需求是要更快的插入速度还是更准确的发送时间吧,有序集合的好处就是不用遍历所有,无序的好处就是插入快
        13
    rpdict   164 天前
    @lestat 我是参考了有赞的延时队列,感觉大家做法都差不多?感谢回复,我去看看 laravel 怎么实现的
        14
    Evilk   164 天前
    @yxjn 同意,redis 还是适合缓存,需要队列的话,还是选择专业的吧,比如 rabbitMQ
        15
    strive   164 天前
    可以用个存储过程把要推送的用户和消息放到任务表里面,再把任务表里面数据放到 redis 的 list 里面拿出来处理就可以了
        16
    brickyang   164 天前 via iPhone
        17
    lovedebug   164 天前 via Android
    这种需求用 azure service bus 更好吧。redis 是有丢消息风险的。用 kafka 都好很多。
        18
    ericliu001   164 天前
    lpoprpush 看下这个命令
        19
    runnerlee   164 天前   ♥ 2
    laravel 的做法是同时维护三个队列: 主队列 (list), 备份队列 (reserved, zset) , 延时队列 (delayed, zset).

    消息从 list 里 lpop 出来之后会根据超时时间再次存放到备份队列里去, 这个操作用 lua 实现:

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L54
    ```
    -- Pop the first job off of the queue...
    local job = redis.call('lpop', KEYS[1])
    local reserved = false
    if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
    redis.call('lpop', KEYS[3])
    end
    return {job, reserved}
    ```

    而在从主队列 pop 之前, 会根据当前时间从备份队列和延时队列两个 zset 中取出消息 rpush 到主队列中.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L167

    同样也是使用 lua 进行操作
    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L105
    ```
    -- Get all of the jobs with an expired "score"...
    local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

    -- If we have values in the array, we will remove them from the first queue
    -- and add them onto the destination queue in chunks of 100, which moves
    -- all of the appropriate jobs onto the destination queue very safely.
    if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
    redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
    -- Push a notification for every job that was migrated...
    for j = i, math.min(i+99, #val) do
    redis.call('rpush', KEYS[3], 1)
    end
    end
    end

    return val
    ```

    同时为了避免重复消费, 在消息消费成功后, 会手动从备份队列删除备份消息.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Jobs/RedisJob.php#L84

    在每次 pop 出消息并进行消费之前, 会注册一个 timeoutHandler, 通过计时器来实现中断超时任务

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L111

    所以, 当消费过程中发生异常退出或是超时中断后, 会根据重试时间, 从备份队列里面取出备份消息重新消费.
        20
    iyaozhen   164 天前 via Android
    @uoddsa 死循环就行了,做好异常处理。后台运行呗
        21
    runnerlee   164 天前   ♥ 1
    忘了补充一个细节,

    在用 lua 调用 lpop 之后, 会将消息 json decode 出来然后自增 attempts 字段, 再放到备份队列.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L62

    这样是为了实现最大重试次数, 当失败到配置的最大次数之后, 会把消息保存到 mysql 后从 redis 里丢弃掉.
        22
    fishioon   164 天前
    可以考虑下 redis 5.0 中的 stream 结构
        23
    ihipop   163 天前 via Android
    NSQ
        24
    scnace   163 天前 via Android
    di.....disque ?
        25
    zk123   163 天前 via iPhone
    发布订阅模式的数据可靠性不保证,它的数据不保存到快照或者 aof 中,发布即焚,也没有 ack 机制,不适用这样消息队列场景。

    List 队列模式比较适合,不过自己要补偿做许多 ack 以及失败机制。建议还是考虑 MQ 或者其他 push。
        26
    zchlwj   163 天前
    redis 消息队列不存盘的哦,这种场景还是考虑 mq 把
        27
    polebug   163 天前 via Android
    我上次写业务 也是用的 zset 慢点无所谓 反正并发推送
        28
    fuxinya   163 天前 via Android
    如果是 spring 项目可以去看看 redisson
        29
    ducklyl   163 天前
    别用 redis 做队列,用 mq 或 rq
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2332 人在线   最高记录 5043   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 26ms · UTC 14:47 · PVG 22:47 · LAX 07:47 · JFK 10:47
    ♥ Do have faith in what you're doing.