.Net Core/.net 6/.Net 8 实现Mqtt客户端
- 客户端代码
- 调用
直接上代码
(图片来源网络,侵删)nuget引用
MQTTnet
(图片来源网络,侵删)客户端代码
using MQTTnet; using MQTTnet.Client; using MQTTnet.Packets; using System.Text; namespace Code.Mqtt { /// /// Mqtt客户端 /// public class MqttClientBase { /// /// 客户端 /// public IMqttClient client; /// /// 订阅主题列表 /// public List Topics=new List(); public MqttClientOptions options; public MqttClientBaseOptions _opt; /// /// 主动断开连接 /// public bool off = false; public bool isconn = false; /// /// 创建mqtt客户端,并值接传入初始参数 /// /// public MqttClientBase(MqttClientBaseOptions opt) { this._opt = opt; //创建客户端 client = new MqttFactory().CreateMqttClient(); options =new MqttClientOptions() { ClientId=_opt.clientId, ChannelOptions=new MqttClientTcpOptions() { Server=_opt.server, Port=_opt.port, }, Credentials=new MqttClientCredentials(_opt.username,Encoding.UTF8.GetBytes(_opt.password)), //清理会话 CleanSession=false, //设置心跳 KeepAlivePeriod = TimeSpan.FromSeconds(30) }; } /// /// 创建mqtt客户端,不传参数, /// 必须在调用 Connect之前调用过SetOption方法 /// public MqttClientBase() { //创建客户端 client = new MqttFactory().CreateMqttClient(); } /// /// 设置参数 /// /// public void SetOption(MqttClientBaseOptions opt) { options = new MqttClientOptions() { ClientId = _opt.clientId, ChannelOptions = new MqttClientTcpOptions() { Server = _opt.server, Port = _opt.port, }, Credentials = new MqttClientCredentials(_opt.username, Encoding.UTF8.GetBytes(_opt.password)), //清理会话 CleanSession = false, //设置心跳 KeepAlivePeriod = TimeSpan.FromSeconds(30) }; } /// /// 连接服务器 /// /// 连接成功后执行 /// 连接成功事件 public void Connect(Action ConnectedAsync=null) { client.ConnectAsync(options); if(ConnectedAsync != null) { //连接成功事件 client.ConnectedAsync += (args) => { ConnectedAsync(args); return Task.CompletedTask; }; } } /// /// 重连服务器 /// 在连接断开事件中调用,即可实现无限轮询 /// /// 是否重复尝试重连 /// 尝试次数 public void ReConnect() { try { client.ConnectAsync(options).Wait(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } public async Task AddTopic(string topic) { //更新订阅 client.SubscribeAsync(new MqttClientSubscribeOptions() { TopicFilters = new List() { new MqttTopicFilter { Topic = topic } } }); //将主题名称加入列表 Topics.Add(topic); } /// /// 取消订阅 /// /// /// public async Task DeleteTopic(string topic) { client.UnsubscribeAsync(new MqttClientUnsubscribeOptions() { TopicFilters = new List { topic } }); Topics.Remove(topic); } /// /// 发布消息 /// /// 主题 /// 内容 /// public async Task Publish(string topic, string content) { if(client.IsConnected) { client.PublishAsync(new MqttApplicationMessage() { Topic = topic, Payload = Encoding.UTF8.GetBytes(content) }); } } /// /// 主动断开连接 /// public void Disconnect() { off = true; client.DisconnectAsync(); } /// /// 断开连接事件 /// /// /// public async Task DisconnectedAsync(Action action) { client.DisconnectedAsync += (args) => { action(args); return Task.CompletedTask; }; } /// /// 接收消息事件 /// /// /// public async Task Message(Action action) { client.ApplicationMessageReceivedAsync += (args) => { var topic = args.ApplicationMessage.Topic; var msg = args.ApplicationMessage.Payload.BToString(); action(topic, msg); return Task.CompletedTask; }; } } }
调用
我这里是控制台项目
//初始化 var mqtt = new MqttClientBase(new MqttClientBaseOptions() { clientId="client-1", username="username", password="password", server="127.0.0.1", port=10883 }); //断开连接事件 mqtt.DisconnectedAsync((e) => { Console.WriteLine("连接断开"); //重连服务器 mqtt.ReConnect(); }); //连接服务器 mqtt.Connect((args) => { /* 连接成功事件 */ Console.WriteLine("连接成功"); // 添加主题订阅,建议写到 连接成功事件 里面,这样重连后可以重新订阅主题 mqtt.AddTopic("topic-1").Wait(); mqtt.AddTopic("topic-2").Wait(); mqtt.AddTopic("topic-3").Wait(); // 取消主题订阅 mqtt.DeleteTopic("topic-3").Wait(); // 向指定主题推送消息 mqtt.Publish("topic-1", "666666666").Wait(); }); // 收到来自服务器的消息 topic:主题 msg:消息内容 mqtt.Message((topic,msg) => { Console.WriteLine($"收到消息:{topic}:{msg}"); }); // 这里暂停三秒,看三秒后主动断开连接效果 // Task.Delay(3000).Wait(); // 主动断开连接 //mqtt.Disconnect(); while (true) { // 向指定主题推送消息 mqtt.Publish("topic-1", Console.ReadLine()); }