V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
huifer
V2EX  ›  程序员

求助: GoLang MQTT 客户端使用问题

  •  
  •   huifer · 116 天前 · 1223 次点击
    这是一个创建于 116 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我遇到的问题:

    2024-08-29 07:13:10	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_5 , error = pingresp not received, disconnecting
    2024-08-29 07:13:14	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_88 , error = pingresp not received, disconnecting
    2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_43 , error = pingresp not received, disconnecting
    2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_72 , error = pingresp not received, disconnecting
    2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_1 , error = pingresp not received, disconnecting
    2024-08-29 07:13:17	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_37 , error = pingresp not received, disconnecting
    2024-08-29 07:13:18	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_10 , error = pingresp not received, disconnecting
    2024-08-29 07:14:13	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_52 , error = pingresp not received, disconnecting
    2024-08-29 07:14:18	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_59 , error = pingresp not received, disconnecting
    2024-08-29 07:14:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_84 , error = pingresp not received, disconnecting
    2024-08-29 07:14:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_54 , error = pingresp not received, disconnecting
    2024-08-29 07:14:21	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_22 , error = pingresp not received, disconnecting
    2024-08-29 07:14:22	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_12 , error = pingresp not received, disconnecting
    2024-08-29 07:14:23	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_25 , error = pingresp not received, disconnecting
    2024-08-29 07:14:24	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
    2024-08-29 07:14:26	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_36 , error = pingresp not received, disconnecting
    2024-08-29 07:15:08	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_63 , error = pingresp not received, disconnecting
    2024-08-29 07:15:16	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_23 , error = pingresp not received, disconnecting
    2024-08-29 07:15:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_96 , error = pingresp not received, disconnecting
    2024-08-29 07:15:20	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_50 , error = pingresp not received, disconnecting
    2024-08-29 07:15:25	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_75 , error = pingresp not received, disconnecting
    2024-08-29 07:15:30	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_78 , error = pingresp not received, disconnecting
    2024-08-29 07:15:36	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_7 , error = pingresp not received, disconnecting
    2024-08-29 07:15:39	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
    2024-08-29 07:16:17	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_79 , error = pingresp not received, disconnecting
    

    这是我正在使用的程序代码

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	mqtt "github.com/eclipse/paho.mqtt.golang"
    	"go.uber.org/zap"
    	"sync"
    	"time"
    )
    
    // MqttInterface 定义了 MQTT 客户端的基本接口
    type MqttInterface struct {
    	client mqtt.Client
    	Id     string
    	Chan   chan []byte
    	Config MqttConfig
    	wg     sync.WaitGroup
    }
    
    // NewMqttClient 初始化并返回一个新的 MqttInterface 实例
    func NewMqttClient(id string, config MqttConfig) *MqttInterface {
    	return &MqttInterface{
    		Id:     id,
    		Chan:   make(chan []byte, 1000),
    		Config: config,
    	}
    }
    
    // Connect 连接到 MQTT 服务器
    func (m *MqttInterface) Connect(host, username, password string, port int) error {
    	opts := mqtt.NewClientOptions()
    	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
    	opts.SetUsername(username)
    	opts.SetAutoReconnect(false)
    	opts.SetPassword(password)
    	opts.SetClientID(m.Id)
    	//opts.SetDefaultPublishHandler(m.messageHandler)
    	opts.OnConnectionLost = func(client mqtt.Client, err error) {
    		zap.S().Errorf("mqtt connection lost id = %s , error = %+v", m.Id, err)
    		StopMqttClient(m.Id, m.Config)
    	}
    
    	opts.SetOrderMatters(false)
    	opts.SetKeepAlive(60 * time.Second)
    	// 创建并启动客户端
    	client := mqtt.NewClient(opts)
    	if token := client.Connect(); token.Wait() && token.Error() != nil {
    		return token.Error()
    	}
    
    	m.client = client
    	return nil
    }
    
    // messageHandler 处理接收到的消息
    func (m *MqttInterface) messageHandler(client mqtt.Client, msg mqtt.Message) {
    
    }
    
    // Subscribe 订阅一个或多个主题
    func (m *MqttInterface) Subscribe(topics string) error {
    	var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
    		m.wg.Add(1)
    		defer func() {
    			m.wg.Done()
    			//zap.S().Errorf("mqtt subscribe id = %s , topic = %s", m.Id, msg.Topic())
    		}()
    		mqttMsg := MQTTMessage{
    			MQTTClientID: m.Id,
    			Message:      string(msg.Payload()),
    		}
    		jsonData, _ := json.Marshal(mqttMsg)
    
    		m.Chan <- jsonData
    
    	})
    
    	if token.Wait() && token.Error() != nil {
    		zap.S().Errorf(token.Error().Error())
    		return token.Error()
    	}
    	return nil
    }
    
    // Publish 向一个主题发布消息
    func (m *MqttInterface) Publish(topic string, payload interface{}) {
    	token := m.client.Publish(topic, 0, false, payload)
    	token.Wait()
    }
    
    // Disconnect 断开与 MQTT 服务器的连接
    func (m *MqttInterface) Disconnect() {
    	m.client.Disconnect(250)
    }
    
    func (m *MqttInterface) HandlerMsg() {
    	for {
    		c := <-m.Chan
    		PushToQueue("pre_handler", c)
    
    	}
    }
    

    创建 MQTT 客户端和开启订阅

    	client := NewMqttClient(clientId,config)
    	err := client.Connect(broker, username, password, port)
    	if err != nil {
    		zap.S().Errorf("mqtt connect err = %v", err)
            return false
    	}
    	go client.Subscribe(subTopic)
    	go client.HandlerMsg()
        
    

    请问这个问题应该如何解决。

    我的尝试

    1. 我发起了一个 Issues ,我理解是让消息接收后进行异步处理 https://github.com/eclipse/paho.mqtt.golang/issues/686

    2. 修改程序如下

    	var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
    		go func() {
    			mqttMsg := MQTTMessage{
    				MQTTClientID: m.Id,
    				Message:      string(msg.Payload()),
    			}
    			jsonData, _ := json.Marshal(mqttMsg)
    
    			m.Chan <- jsonData
    		}()
    
    	})
    

    上述两个操作均没有得到正常处理。请问应当如何解决这个问题。

    3 条回复
    ForrestWang
        1
    ForrestWang  
       116 天前
    不太理解你遇到了什么问题,可以描述一下吗
    brucemaclin
        2
    brucemaclin  
       116 天前
    连 1883 抓包看 日志显示 ping 的回包没收到 断链
    huifer
        3
    huifer  
    OP
       116 天前
    就是正常来讲不会出现:mqtt connection lost id = TT_79 , error = pingresp not received, disconnecting 问题。 我正在尝试 1883 这个链接 。 我更新了一下 ISSUES

    https://github.com/eclipse/paho.mqtt.golang/issues/688

    谢谢大家的回复 @ForrestWang @brucemaclin
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2970 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 14:12 · PVG 22:12 · LAX 06:12 · JFK 09:12
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.