亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術(shù)文章
文章詳情頁

.NETCore基于RabbitMQ實現(xiàn)延時隊列的兩方法

瀏覽:190日期:2022-06-08 17:02:45
目錄
  • 前言
  • 實現(xiàn)延時隊列的兩種方式
    • 利用rabbitmq死信隊列x-dead-letter-exchange和x-dead-letter-routing-key
    • .NETCore實現(xiàn)方式
    • rabbitmq通過安裝插件的形式實現(xiàn)(推薦)
    • .NET Core 實現(xiàn)
  • 第一種方式的缺陷以及解決方案

    前言

    此文章用來記錄自己學(xué)習(xí)延時隊列過程的文章,并用.NET這兩種方式實現(xiàn)了簡單的Demo。

    延時隊列的應(yīng)用場景 應(yīng)用下單后,30分鐘沒有支付的話,則自動取消訂單活動開始前30分鐘,提醒參賽者參加活動。活動結(jié)束后,30分鐘后提醒未進行評價的參賽人員進行評價…

    上述的場景都可以使用延時隊列進行對應(yīng)的處理。

    上面的場景雖說可以通過定時器也可以處理,但有點浪費資源, 而上述的場景時間是不定的,例如有兩個活動需要提醒參賽者參加,一個是7點開始 ,另一個是8點開始,那么觸發(fā)處理的一個是6點半,一個是7點半。

    實現(xiàn)延時隊列的兩種方式

    使用Rabbitmq實現(xiàn)延時隊列可以讓消息持久化,也支持分布式

    缺點第一種第一種方式的缺陷以及解決方案第二種這個插件的當(dāng)前設(shè)計并不真正適合具有大量延遲消息(例如成百上千或數(shù)百萬)的場景。詳情信息

    利用rabbitmq死信隊列x-dead-letter-exchange和x-dead-letter-routing-key

    實現(xiàn)需要創(chuàng)建兩對交換機和隊列,其中需要對其中一對的隊列進行設(shè)置x-dead-letter-exchange和x-dead-letter-routing-key屬性,屬性指定轉(zhuǎn)發(fā)到另一對的交換機,

    隨后實現(xiàn)流程圖如下:

    .NETCore實現(xiàn)方式

    項目:.NET Core 控制臺項目

    install-package RabbitMQ.Client

    生產(chǎn)者代碼:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創(chuàng)建連接    var connection = connectionFactory.CreateConnection();    //創(chuàng)建通道    var channl = connection.CreateModel();   //指定隊列的x-dead-letter-exchange和x-dead-letter-routing-key    Dictionary<string, object> queueArgs = new Dictionary<string, object>()    {{ "x-dead-letter-exchange","exchange.business.test" },{"x-dead-letter-routing-key","businessRoutingkey" }    };    //延時的交換機和隊列綁定    channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);    channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);    channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");    //業(yè)務(wù)的交換機和隊列綁定    channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);    channl.QueueDeclare("queue.business.test", true, false, false, null);    channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);    Console.WriteLine("生產(chǎn)者開始發(fā)送消息");    while (true)    {string message = Console.ReadLine();var body = Encoding.UTF8.GetBytes(message);var properties = channl.CreateBasicProperties();properties.Persistent = true;properties.Expiration = "5000";//發(fā)送一條延時5秒的消息channl.BasicPublish("exchange.business.dlx", "", properties, body);    }

    消費者

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創(chuàng)建連接    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //給消費時添加一個委托    consumer.Received += (obj, ea) =>    {var message = Encoding.UTF8.GetString(ea.Body.ToArray());//打印消費的消息Console.WriteLine(message);channel.BasicAck(ea.DeliveryTag, false);    };    //消費queue.business.test隊列的消息    channel.BasicConsume("queue.business.test", false, consumer);    Console.ReadKey();    channel.Dispose();    connection.Close();

    實現(xiàn)效果:

    rabbitmq通過安裝插件的形式實現(xiàn)(推薦)

    使用rabbitmq_delayed_message_exchange 插件提供的x-delayed-message類型的交換機

    下載插件的地址:https://www.rabbitmq.com/community-plugins.html
    選中rabbitmq_delayed_message_exchange插件

    該插件使用只需要聲明交換機的時候,指定x-delayed-message類型,然后添加x-delayed-type參數(shù)即可

    .NET Core 實現(xiàn)

    生產(chǎn)者

        ConnectionFactory connectionFactory = new ConnectionFactory()    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    Dictionary<string, object> exchangeArgs = new Dictionary<string, object>()    {{"x-delayed-type","direct" }    };    //指定x-delayed-message 類型的交換機,并且添加x-delayed-type屬性    channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs);    channel.QueueDeclare("plug.delay.queue", true, false, false, null);    channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay");    var properties = channel.CreateBasicProperties();    Console.WriteLine("生產(chǎn)者開始發(fā)送消息");    Dictionary<string, object> headers = new Dictionary<string, object>()    {{"x-delay","5000" }    };    properties.Persistent = true;    properties.Headers = headers;    while (true)    {string message = Console.ReadLine();var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body);    }

    消費者:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創(chuàng)建連接    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    consumer.Received += (obj, ea) =>    {var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine(message);channel.BasicAck(ea.DeliveryTag, false);    };    channel.BasicConsume("plug.delay.queue", false, consumer);    Console.ReadKey();    channel.Dispose();    connection.Close();

    實現(xiàn)效果:

    第一種方式的缺陷以及解決方案

    如果存在A、B消息進入了隊列中,A在前,B在后,如果B消息的過期時間比A的過期時間要早,消費的時候,并不會先消費B,再消費A,而是B會等A先消費,即使A要晚過期

    舉例

    生產(chǎn)者代碼修改成如下:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創(chuàng)建連接    var connection = connectionFactory.CreateConnection();    //創(chuàng)建通道    var channl = connection.CreateModel();    Dictionary<string, object> queueArgs = new Dictionary<string, object>()    {{ "x-dead-letter-exchange","exchange.business.test" },{"x-dead-letter-routing-key","businessRoutingkey" }    };    //延時的交換機和隊列綁定    channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);    channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);    channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");    //業(yè)務(wù)的交換機和隊列綁定    channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);    channl.QueueDeclare("queue.business.test", true, false, false, null);    channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);    string message1 = "Hello Word!1";    string message2 = "Hello Word!2";    var body1 = Encoding.UTF8.GetBytes(message1);    var body2 = Encoding.UTF8.GetBytes(message2);    var properties = channl.CreateBasicProperties();    properties.Persistent = true;    //先發(fā)送過期時間5秒的消息    properties.Expiration = "5000";    channl.BasicPublish("exchange.business.dlx", "", properties, body2);    //再發(fā)送過期時間3秒的消息    properties.Expiration = "3000";    channl.BasicPublish("exchange.business.dlx", "", properties, body1);

    結(jié)果:

    這里先發(fā)了延時20秒的A消息,然后又發(fā)了延時10秒的B消息,但是最終結(jié)果并不是先消費了B消息,而是等A消息過期后,立刻再去消費B。

    這個會影響什么業(yè)務(wù)呢?好比兩個C、D活動,C活動開始時間是7點,D活動開始時間是5點,那么D活動提醒需要等到C活動提醒后,才會立刻提醒,這明顯不符合我們的業(yè)務(wù)需求。

    解決方案 每個活動都是單獨的創(chuàng)建自己的交換機和隊列使用第二種實現(xiàn)方式,即使用插件的形式。

    第一種不太現(xiàn)實,因為如果活動多的話,則會創(chuàng)建很多的隊列,而且只會使用一次。

    業(yè)務(wù)上還是推薦使用插件的實現(xiàn)方式。

    第二種方式的效果

    github地址:

    https://github.com/MDZZ3/RabbitmqDelay

    到此這篇關(guān)于.NETCore基于RabbitMQ實現(xiàn)延時隊列的兩方法的文章就介紹到這了,更多相關(guān).NETCore RabbitMQ 內(nèi)容請搜索以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持!

    標簽: ASP.NET
    相關(guān)文章:
    主站蜘蛛池模板: 精品国产第一国产综合精品gif | 一区二区国产精品 | 国产在线观看免费 | 麻豆精品视频网站在线观看 | 欧美三级视频在线播放 | 国产欧美日韩亚洲精品区2345 | 美女批日起爽在线观看 | 日韩精品观看 | 亚洲欧洲一区二区三区 | 五月婷婷综合激情网 | 182tv精品视频在线播放 | 久久精品国产亚洲麻豆小说 | 欧美日韩视频一区二区三区 | 欧美日韩高清观看一区二区 | 免费在线观看成人 | 久久久久久国产a免费观看黄色大片 | 青草青草伊人精品视频 | 日本特黄在线观看免费 | 亚洲精品小说一区二区三区 | 亚洲精品一区亚洲精品 | 欧美日产国产亚洲综合图区一 | 欧美 亚洲 国产 精品有声 | 女性特黄一级毛片 | 久久精品国产精品青草 | 国产在线成人一区二区 | 草比视频在线观看 | 日韩一级欧美一级毛片在线 | 永久免费观看黄网站 | 国产日韩欧美在线观看 | 三级在线免费看 | 国产三级精品三级在线观看 | 操日本人 | 操出水视频 | 国产xvideoscom| 九九精品视频在线观看九九 | www.a级片| 欧美国产日本 | 一级成人生活片免费看 | 国产黄色影片 | 久久午夜综合久久 | 亚洲欧美日韩国产精品 |