.Net Core/.net 6/.Net 8 实现Mqtt客户端

慈云数据 2024-03-12 技术支持 146 0

.Net Core/.net 6/.Net 8 实现Mqtt客户端

  • 客户端代码
  • 调用

    直接上代码

    .Net Core/.net 6/.Net 8 实现Mqtt客户端
    (图片来源网络,侵删)

    nuget引用

    MQTTnet

    .Net Core/.net 6/.Net 8 实现Mqtt客户端
    (图片来源网络,侵删)

    客户端代码

    using MQTTnet;
    using MQTTnet.Client;
    using MQTTnet.Packets;
    using System.Text;
    namespace Code.Mqtt
    {
        /// 
        /// Mqtt客户端
        /// 
        public class MqttClientBase
        {
            /// 
            /// 客户端
            /// 
            public IMqttClient client;
            /// 
            /// 订阅主题列表
            /// 
            public List Topics=new List();
            public MqttClientOptions options;
            public MqttClientBaseOptions _opt;
            /// 
            /// 主动断开连接
            /// 
            public bool off = false;
            public bool isconn = false;
            /// 
            /// 创建mqtt客户端,并值接传入初始参数
            /// 
            /// 
            public MqttClientBase(MqttClientBaseOptions opt)
            {
                this._opt = opt;
                //创建客户端
                client = new MqttFactory().CreateMqttClient();
                options =new MqttClientOptions() { 
                    ClientId=_opt.clientId,
                    ChannelOptions=new MqttClientTcpOptions()
                    {
                        Server=_opt.server,
                        Port=_opt.port,
                    },
                    Credentials=new MqttClientCredentials(_opt.username,Encoding.UTF8.GetBytes(_opt.password)),
                    //清理会话
                    CleanSession=false,
                    //设置心跳
                    KeepAlivePeriod = TimeSpan.FromSeconds(30)
                };
            }
            /// 
            /// 创建mqtt客户端,不传参数,
            /// 必须在调用 Connect之前调用过SetOption方法
            /// 
            public MqttClientBase()
            {
                //创建客户端
                client = new MqttFactory().CreateMqttClient();
            }
            /// 
            /// 设置参数
            /// 
            /// 
            public void SetOption(MqttClientBaseOptions opt)
            {
                options = new MqttClientOptions()
                {
                    ClientId = _opt.clientId,
                    ChannelOptions = new MqttClientTcpOptions()
                    {
                        Server = _opt.server,
                        Port = _opt.port,
                    },
                    Credentials = new MqttClientCredentials(_opt.username, Encoding.UTF8.GetBytes(_opt.password)),
                    //清理会话
                    CleanSession = false,
                    //设置心跳
                    KeepAlivePeriod = TimeSpan.FromSeconds(30)
                };
            }
            /// 
            /// 连接服务器
            /// 
            /// 连接成功后执行
            /// 连接成功事件
            public void Connect(Action ConnectedAsync=null)
            {
                client.ConnectAsync(options);
                if(ConnectedAsync != null)
                {
                    //连接成功事件
                    client.ConnectedAsync += (args) =>
                    {
                        ConnectedAsync(args);
                        return Task.CompletedTask;
                    };
                }
            }
            /// 
            /// 重连服务器
            /// 在连接断开事件中调用,即可实现无限轮询
            /// 
            /// 是否重复尝试重连
            /// 尝试次数
            public void ReConnect()
            {
                try
                {
                    client.ConnectAsync(options).Wait();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
            public async Task AddTopic(string topic)
            {
                //更新订阅
                client.SubscribeAsync(new MqttClientSubscribeOptions()
                {
                    TopicFilters = new List() {
                        new MqttTopicFilter { Topic = topic }
                    }
                });
                
                //将主题名称加入列表
                Topics.Add(topic);
            }
            /// 
            /// 取消订阅
            /// 
            /// 
            /// 
            public async Task DeleteTopic(string topic)
            {
                client.UnsubscribeAsync(new MqttClientUnsubscribeOptions()
                {
                    TopicFilters = new List { topic }
                });
                Topics.Remove(topic);
            }
            /// 
            /// 发布消息
            /// 
            /// 主题
            /// 内容
            /// 
            public async Task Publish(string topic, string content)
            {
                if(client.IsConnected)
                {
                    client.PublishAsync(new MqttApplicationMessage()
                    {
                        Topic = topic,
                        Payload = Encoding.UTF8.GetBytes(content)
                    });
                }
            }
            /// 
            /// 主动断开连接
            /// 
            public void Disconnect()
            {
                off = true;
                client.DisconnectAsync();
            }
            /// 
            /// 断开连接事件
            /// 
            /// 
            /// 
            public async Task DisconnectedAsync(Action action)
            {
                client.DisconnectedAsync += (args) => {
                    action(args);
                    return Task.CompletedTask;
                };
            }
            /// 
            /// 接收消息事件
            /// 
            /// 
            /// 
            public async Task Message(Action action) {
                client.ApplicationMessageReceivedAsync += (args) =>
                {
                    var topic = args.ApplicationMessage.Topic;
                    var msg = args.ApplicationMessage.Payload.BToString();
                    action(topic, msg);
                    return Task.CompletedTask;
                };
            }
        }
    }
    

    调用

    我这里是控制台项目

    //初始化
    var mqtt = new MqttClientBase(new MqttClientBaseOptions() { 
        clientId="client-1",
        username="username",
        password="password",
        server="127.0.0.1",
        port=10883
    });
    //断开连接事件
    mqtt.DisconnectedAsync((e) => {
        Console.WriteLine("连接断开");
        //重连服务器
        mqtt.ReConnect();
    });
    //连接服务器
    mqtt.Connect((args) => {
        /* 连接成功事件 */
        Console.WriteLine("连接成功");
        // 添加主题订阅,建议写到 连接成功事件 里面,这样重连后可以重新订阅主题
        mqtt.AddTopic("topic-1").Wait();
        mqtt.AddTopic("topic-2").Wait();
        mqtt.AddTopic("topic-3").Wait();
        // 取消主题订阅
        mqtt.DeleteTopic("topic-3").Wait();
        // 向指定主题推送消息
        mqtt.Publish("topic-1", "666666666").Wait();
    });
    // 收到来自服务器的消息 topic:主题  msg:消息内容
    mqtt.Message((topic,msg) => { 
        Console.WriteLine($"收到消息:{topic}:{msg}");
    });
    // 这里暂停三秒,看三秒后主动断开连接效果
    // Task.Delay(3000).Wait();
    // 主动断开连接
    //mqtt.Disconnect();
    while (true)
    {
        // 向指定主题推送消息
        mqtt.Publish("topic-1", Console.ReadLine());
    }
    
微信扫一扫加客服

微信扫一扫加客服

点击启动AI问答
Draggable Icon