V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
wanchenyi
V2EX  ›  问与答

kafka 单节点问题请教

  •  
  •   wanchenyi · 88 天前 · 982 次点击
    这是一个创建于 88 天前的主题,其中的信息可能已经有所发展或是发生改变。
    佬们,请教个问题,我需要搭建一个单节点的 kafka 服务器,,根据官网的教程,我已经搭建好了,现在需要局域网的节点的能连 kafka 服务器,为了 kafka 服务器能局域网能访问,需要修改配置文件 ,修改内容如下

    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.5.155:9092

    这样操作后,在本机上,创建话题,再使用生产者,和消费者没有问题。但是我在另一台服务器上使用 Python3 脚本模拟生产者,消费者就有问题。
    下面是生产者脚本
    from kafka3 import KafkaProducer
    import json

    创建 Kafka Producer 实例
    producer = KafkaProducer(
    bootstrap_servers=‘192.168.5.155:9092’, # Kafka 服务器的地址和端口
    value_serializer=lambda v: json.dumps(v).encode(‘utf-8’) # 将消息序列化为 JSON 格式
    )

    发送消息到 Kafka 的主题
    producer.send(‘test-topic’, {‘key’: ‘value’}) # ‘my-topic’ 替换为你的实际主题名

    确保所有消息都被发送
    producer.flush()

    print(“消息发送成功”)
    下面是消费者的脚本
    from kafka3 import KafkaConsumer
    import json

    创建 Kafka Consumer 实例
    consumer = KafkaConsumer(
    ‘test-topic’, # 替换为你的实际主题名
    bootstrap_servers=‘192.168.5.155:9092’, # Kafka 服务器的地址和端口
    auto_offset_reset=‘earliest’, # 从最早的消息开始读取
    enable_auto_commit=True, # 自动提交偏移量
    group_id=‘my-group’, # 消费者组 ID
    value_deserializer=lambda x: json.loads(x.decode(‘utf-8’)) # 将消息反序列化为 JSON 格式
    )

    消费消息
    for message in consumer:
    print(f"接收到消息: {message.value}")
    运行脚本报错了,
    Traceback (most recent call last):
    File “/root/python_code/writing.py”, line 5, in
    producer = KafkaProducer(
    File “/usr/local/lib/python3.10/dist-packages/kafka3/producer/kafka.py”, line 383, in init
    client = self.config[‘kafka_client’](
    File “/usr/local/lib/python3.10/dist-packages/kafka3/client_async.py”, line 244, in init
    self.config[‘api_version’] = self.check_version(timeout=check_timeout)
    File “/usr/local/lib/python3.10/dist-packages/kafka3/client_async.py”, line 900, in check_version
    raise Errors.NoBrokersAvailable()
    kafka3.errors.NoBrokersAvailable: NoBrokersAvailable

    想问哈各位哪里不对,脚本是用 chatgpt4 生成的,我在 192.168.5.111 上 ping 192.168.5.155 是通的,我试了 telnet 192.168.5.155 9092 不通,但我已经 关了防火墙,我查看了又有端口监听在 9092.
    感谢各位大佬给我指点。
    1 条回复    2024-08-25 14:49:43 +08:00
    woodslee
        1
    woodslee  
       88 天前
    既然端口不通,首先看一下 kafka server 主机,监听的端口是 0.0.0.0:9092 还是 127.0.0.1:9092
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3339 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 11:39 · PVG 19:39 · LAX 03:39 · JFK 06:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.