V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
kaolalicai
V2EX  ›  Node.js

消息队列 Kafka Nodejs 的使用

  •  
  •   kaolalicai · 2019-08-01 16:18:45 +08:00 · 7024 次点击
    这是一个创建于 1992 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一. 消息队列

    (一) 使用场景:

    这边就先不介绍消息队列的优劣,主要列了一下它的三种核心的场景。

    1 . 解耦

    解耦.jpg

    2 . 异步

    异步.jpg

    3 . 削峰

    削峰.jpg

    (二) 消费方式:

    1 . 点对点:Work Queue

    点对点 1.jpg

    点对点 2.jpg

    2 . 发布-订阅:Publish/Subscribe

    发布订阅.jpg

    目前我们项目应用到的场景:

    目前我们使用 RabbitMq,主要使用点对点的消费模式。

    削峰,异步:

    削峰异步.jpg

    我们这些场景如果用 Kafka 该如何实现?

    二. Kafka

    (一) 简介

    官网的描述是这几句:

    Apache Kafka® is a distributed streaming platform**. What exactly does that mean?**
    A streaming platform has three key capabilities:

    • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
    • Store streams of records in a fault-tolerant durable way.
    • Process streams of records as they occur.

    Kafka 是一个流处理平台

    一个流处理平台有三个关键的特点:

    1. 发布&订阅流式数据,类似于消息队列或企业消息传递系统;
    2. 在高容错方式下保存流式数据;
    3. 当数据流产生时实时进行处理。

    Kafka is generally used for two broad classes of applications:

    • Building real-time streaming data pipelines that reliably get data between systems or applications
    • Building real-time streaming applications that transform or react to the streams of data

    Kafka 主要应用在两个类应用中:

    1. 构建可在系统或应用程序之前构建可靠获取数据的实时数据流管道;
    2. 构建一个转换或响应数据流的实时数据流应用程序。

    (二) kafka 架构图

    架构图.jpg

    (三)名词

    Producer: 生产者,发送信息的服务端
    Consumer:消费者,订阅消息的客户端
    Broker:消息中间件处理节点,一个 Kafka 节点就是一个 broker,一个或者多个 Broker 可以组成一个 Kafka 集群
    Topic: 主题,可以理解成队列
    ConsumerGroup:消费者组,一个 ConsumerGoup 里面包括多个 Consumer,每个 ConsumerGoup 里面只有一个 Consumer 可以消费一个 Topic。基于这个特性,每个 ConsumerGoup 里面只存一个 Consumer 可以实现广播;所有 Consumer 都存在于同一个 ConsumerGoup 内则可以实现单播。
    Partition:基于 Kafka 的拓展性,有可能一个很大的 Topic 会存在于不同的 Broker 里面。这时一个 Topic 里面就会存在多个 Partition,Partition 是一个有序的队列,Partition 上每个消息会有一个顺序的 id —— Offset。但是,值得注意的是,Kafka 会保证 Partition 的顺序性,而没有保证 Topic 的顺序性。
    Offset:Kafka 的存储文件都是 offset 顺序存储的,以 offset.kafka 来命名。例如第一个就是 0000.kafka, 第 n 个文件就是 n-1.kafka。
    Zookeerper:管理多个 Kafka 节点,具有管理集群配置的功能。

    三. Kafka Nodejs 实现

    (一)消费方式:点对点

    1.单个消费者的实现,应用场景是只有一个消费者节点 需要消费该消息。
    图例: Producer:

    // Producer.ts
    
    import * as kafka from 'kafka-node'
    const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
    const producer = new kafka.HighLevelProducer(client)
    producer.on('ready', function () {
      console.log('Kafka Producer is connected and ready.')
    })
    // For this demo we just log producer errors to the console.
    producer.on('error', function (error) {
      console.error(error)
    })
    const sendRecord = (objData, cb) => {
      const buffer = Buffer.from(JSON.stringify(objData))
      // Create a new payload
      const record = [
        {
          topic: 'webevents.dev',
          messages: buffer,
          attributes: 1 /* Use GZip compression for the payload */
        }
      ]
      // Send record to Kafka and log result/error
      producer.send(record, cb)
    }
    let times = 0
    setInterval(() => {
      sendRecord({
        msg: `this is message ${++times}!`
      }, (err, data) => {
        if (err) {
          console.log(`err: ${err}`)
        }
        console.log(`data: ${JSON.stringify(data)}`)
      })
    }, 1000)
    
    

    ​ Consumer 代码:

    // Consumer.ts
    import * as kafka from 'kafka-node'
    const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
    const topics = [
      {
        topic: 'webevents.dev'
      }
    ]
    const options = {
      autoCommit: true,
      fetchMaxWaitMs: 1000,
      fetchMaxBytes: 1024 * 1024
      // encoding: 'buffer'
    }
    // { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };
    const consumer = new kafka.Consumer(client, topics, options)
    consumer.on('message', function (message) {
      // Read string into a buffer.
      console.info(`[message]:==:>${JSON.stringify(message)}`)
      const buf = new Buffer(String(message.value), 'binary')
      const decodedMessage = JSON.parse(buf.toString())
      console.log('decodedMessage: ', decodedMessage)
    })
    consumer.on('error', function (err) {
      console.log('error', err)
    })
    process.on('SIGINT', function () {
      consumer.close(true, function () {
        process.exit()
      })
    })     
    

    2.当我的服务是多节点,如何保证同一个消息只被其中一个节点消费呢。 这个时候就需要把每个节点当做同一个 ConsumerGroup 里的不同 Consumer。 图例: Producer 同上 Consumer:

    // Consumer1
    import * as kafka from 'kafka-node'
    const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
    const offset = new kafka.Offset(client)
    import * as bluebird from 'bluebird'
    const consumerGoupOptions = {
      kafkaHost: 'localhost:9092',
      groupId: 'ExampleTestGroup',
      sessionTimeout: 15000,
      protocol: ['roundrobin'],
      fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
    } as any
    const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions),  ['test'])
    export default consumer
    
    // 处理消息
    consumer.on('message', async function (message) {
    	console.info('i am consumer1!')
      // Read string into a buffer.
      console.info(`[message]:==:>${JSON.stringify(message)}`)
      // const buf = new Buffer(String(message.value), 'binary')
      const decodedMessage = message // JSON.parse(buf.toString())
    
      await bluebird.delay(1000)
      console.log('decodedMessage: ', decodedMessage)
    })
    
    // 消息处理错误
    consumer.on('error', function (err) {
      console.log('error', err)
    })
    
    consumer.on('offsetOutOfRange', function (topic) {
      console.info(`[offsetOutOfRange]:==:>${topic}`)
      topic.maxNum = 2
      offset.fetch([topic], function (err, offsets) {
        if (err) {
          return console.error(err)
        }
        let min = Math.min.apply(null, offsets[topic.topic][topic.partition])
        consumer.setOffset(topic.topic, topic.partition, min)
      })
    })
    
    process.on('SIGINT', function () {
      consumer.close(true, function () {
        console.log('consumer colse!')
        process.exit()
      })
    })
    
    // Consumer2
    import * as kafka from 'kafka-node'
    
    const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
    const offset = new kafka.Offset(client)
    import * as bluebird from 'bluebird'
    
    const consumerGoupOptions = {
      kafkaHost: 'localhost:9092',
      groupId: 'ExampleTestGroup',
      sessionTimeout: 15000,
      protocol: ['roundrobin'],
      fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
    } as any
    const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions),  ['test'])
    export default consumer
    
    // 处理消息
    consumer.on('message', async function (message) {
    	console.info('i am consumer2!')
      // Read string into a buffer.
      console.info(`[message]:==:>${JSON.stringify(message)}`)
      // const buf = new Buffer(String(message.value), 'binary')
      const decodedMessage = message // JSON.parse(buf.toString())
    
      await bluebird.delay(1000)
      console.log('decodedMessage: ', decodedMessage)
    })
    
    // 消息处理错误
    consumer.on('error', function (err) {
      console.log('error', err)
    })
    
    consumer.on('offsetOutOfRange', function (topic) {
      console.info(`[offsetOutOfRange]:==:>${topic}`)
      topic.maxNum = 2
      offset.fetch([topic], function (err, offsets) {
        if (err) {
          return console.error(err)
        }
        let min = Math.min.apply(null, offsets[topic.topic][topic.partition])
        consumer.setOffset(topic.topic, topic.partition, min)
      })
    })
    
    process.on('SIGINT', function () {
      consumer.close(true, function () {
        console.log('consumer colse!')
        process.exit()
      })
    })
    

    执行之后,发现了一个问题:同一个 ConsumerGroup 的不同 Consumer 没有均匀消费数据, 会出现一段时间,只有一个 Consumer 消费, 而另一个 Conumser 不消费的情况。

    ​ 为什么呢?
    ​ 这里就需要知道消费端的均衡算法
    [Kafka 消费端均衡算法]: https://blog.csdn.net/wobuaizhi/article/details/80950387

    算法如下:
    1.A=(partition 数量 /同分组消费者总个数) 2.M=对上面所得到的 A 值小数点第一位向上取整 3.计算出该消费者拉取数据的 patition 合集:Ci = [P(M*i ),P((i + 1) * M -1)] ​ Partition 数量为 1, 因为只有一个 broker

    同分组消费者总个数:2

    A = 1 / 2

    M = roundUp (A) = 1

    C0 = [P(0), P ( 0]`

    C1 = [P(1), P(1)]

    所以,如果不是 C0 消费者不可用,C1 一直都不会去消费 Partition0 里面的消息
    结论是,如果非多 Kafka 节点的话, 单纯增加同一消费组里的消费者, 并不能做到均衡消费数据的情况。
    有其他方法可以实现吗?
    有的, 我们可以从 Producer 里面入手,分发消息时固定 Topic 对应 固定的消费者节点。
    Producer:

    // Producer
    // ...
    
    const sendRecord = (objData, cb) => {
      const partition = Date.now() % 2 === 0 ? 0 : 1
      const buffer = Buffer.from(JSON.stringify(objData) + '_' + partition)
    
      // Create a new payload
      const record = [
        {
          topic: `test${partition}`,       // 这里用了随机方法分配 topic
          messages: buffer,
          attributes: 1, /* Use GZip compression for the payload */
          key: `key_${partition}`
        }
      ]
    
      // Send record to Kafka and log result/error
      console.info(`[record]:==:>${JSON.stringify(record)}`)
      producer.send(record, cb)
    }
    
    // ...
    

    Consumer:

    // Consumer1
    // ...
    
    const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions),  ['test0', 'test1'])		// 这里需要优先输入 需要消费的 topic, 次要消费的 topic 也要写上,以防另一节点重启时, 消息没及时消费
    
    // ...
    
    // Consumer2
    // ...
    
    const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions),  ['test1', 'test2'])		// 这里需要优先输入 需要消费的 topic, 次要消费的 topic 也要写上,以防另一节点重启时, 消息没及时消费
    
    // ...
    

    四. 总结:

    Kafka

    设计上:队列消息不删除,不同 ConsumerGroup 都可以 publish-subscribe,同一 ConsumerGroup 里面只有一个 Consumer 能消费同一个 Topic
    延迟消费:不支持:Consumer 开启后, 会自动获取 Producer 生产对应 Topic 的消息, 若想 Consumer 暂时不消费消息, 需要中断 Consumer 的服务
    负载均衡:从集群上看, 即使其中一个 Broker 挂了,其他 Broker 上的 partition 都会存在副本集,kafka 仍然可以正常运行。从 ConsumerGroup 上看,即使其中的 Consumer 挂了, 同一 ConsumerGroup 的其他 Consumer 仍然可以消费其 Topic 的消息,而不需要担心服务中断。
    实际上:Kafka 做点对点队列,有点浪费。只用一个 ConsumerGroup,并没有发挥 Kafka 的优势。但是 Kafka 这种很方便就能拓展成发布-订阅模式,消费端建立另外一个 ConsumerGroup,就可以为另一个服务启用。

    End

    参考资料

    代码:
    https://github.com/yuchenzhen/node-kafka-demo

    https://blog.csdn.net/tototuzuoquan/article/details/73441373

    https://zhuanlan.zhihu.com/p/58836260

    https://juejin.im/post/5b59c6055188257bcc16738c

    https://lotabout.me/2018/kafka-introduction/

    2 条回复    2019-08-01 18:58:57 +08:00
    julyclyde
        1
    julyclyde  
       2019-08-01 18:27:08 +08:00
    so 这么长到底写了什么?
    好像没有什么信息量啊
    PDX
        2
    PDX  
       2019-08-01 18:58:57 +08:00
    闹眼睛
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1005 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 20:53 · PVG 04:53 · LAX 12:53 · JFK 15:53
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.