当前位置:首页 > 技术文章 > 基于redis实现的轻量级延迟队列

基于redis实现的轻量级延迟队列

go1238个月前 (05-31)技术文章540

什么是延迟队列 延迟队列 (Delay Queue)是一种存储消息并在特定延迟时间后将其投递到消费者的队列机制。在传统消息队列中,消息会立即被推送给消费者进行处理,但在某些场景下,我们希望消息在一段时间后再被消费者处理。 延迟队列的工作原理通常是将消息先存储在队列中,消息的投递时间会被延迟,直到延迟时间到达时才将消息投递给消费者进行处理。这种机制保证了消息能够在指定的延迟时间后才会被处理,实现了消息的延迟投递。

什么是延迟队列

延迟队列 (Delay Queue)是一种存储消息并在特定延迟时间后将其投递到消费者的队列机制。在传统消息队列中,消息会立即被推送给消费者进行处理,但在某些场景下,我们希望消息在一段时间后再被消费者处理。

延迟队列的工作原理

延迟队列的工作原理通常是将消息先存储在队列中,消息的投递时间会被延迟,直到延迟时间到达时才将消息投递给消费者进行处理。这种机制保证了消息能够在指定的延迟时间后才会被处理,实现了消息的延迟投递。

延迟队列的使用场景

  1. 电商平台,超过 30 分钟未支付的订单,将会被取消

  2. 商品签收后,3天未确认,自动确认

  3. 在平台注册但 30 天内未登录的用户,发短信提醒

  4. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

  5. 商品抢购,预定后,开售前30分钟提醒

场景很多,就不一一列举,这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务。这些就可以使用延迟队列来解决

延迟队列常见的实现方式

  1. 通过程序的方式实现,例如 [JDK] 自带的延迟队列 DelayQueue

  2. 通过 [MQ] 框架来实现,例如 [RabbitMQ] 可以通过 rabbitmq-delayed-message-exchange 插件来实现延迟队列

  3. 通过 Redis 的方式来实现延迟队列

JDK 自带的延迟队列 DelayQueue

优点

  1. 开发比较方便,可以直接在代码中使用

  2. 代码实现比较简单

缺点

  1. 不支持持久化保存

  2. 不支持分布式系统

MQ 实现方式

[RabbitMQ]本身并不支持延迟队列,但可以通过添加插件 rabbitmq-delayed-message-exchange 来实现延迟队列。

优点

  1. 支持分布式

  2. 支持持久化

缺点

框架比较重,需要搭建和配置 MQ。

Redis 实现方式

[Redis] 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。

优点

  1. 灵活方便,[Redis]是互联网公司的标配,无序额外搭建相关环境;

  2. 可进行消息持久化,大大提高了延迟队列的可靠性;

  3. 分布式支持,不像 JDK 自身的 DelayQueue;

  4. 高可用性,利用 Redis 本身高可用方案,增加了系统健壮性。

缺点

需要使用无限循环的方式来执行任务检查,会消耗少量的系统资源。

综合以上,我们决定使用redis去实现延迟队列,刚好有个开源项目lmstfy基于redis实现的,下面介绍下这个项目的使用

lmstfy

美图开源的 lmstfy(https://github.com/bitleak/lmstfy)(let me schedule task for you),基于 redis 存储,使用 golang 开发,资源占用少,轻量级,并且经受美图线上环境大流量验证多年,较适合用来做延迟队列。

lmstfy特点

  1. 支持基本的消息队列特性:发布消息,消费消息,删除消息;

  2. 支持以下额外特性

  3. 支持消息有效期,过期自动删除;

  4. 支持消息延迟消费;

  5. 支持消息自动重试

  6. 支持死信队列

  7. 支持命名空间,队列级别的[prometheus] 监控,提供grafana dashboard;

  8. 支持发布,消费流量控制。

lmstfy工作原理

如上图所示:

  1. 消息经过生产者发布到lmstfy,如果delay时间大于0,则消息被扔到[redis]的ZSet中(score为延迟绝对时间),等延迟时间到了,才会挪到就绪队列;如果delay为0,则消息直接到就绪队列。

  2. 消费者每次都是从就绪队列消费消息;

  3. 服务器内部会有Timer,每隔一秒检查ZSet中是否有到期的消息,有则将消息移到就绪队列供消费者消费;

  4. 每一个消息发布的时候,可以设置最多被消费的次数,如果达到最大次数都还未正确处理(delete或者ack),则消息会被挪到死信队列;

  5. 业务可以选择复活死信队列中的消息,也可以选择直接删除死信队列中的消息;被复活的死信消息会再次挪到就绪队列中,可以被消费者正常消息。

lmstfy服务端安装部署

前提

lmstfy是依赖[redis]实现的延迟队列,所以首先要安装redis,redis的安装教程可以谷歌搜一搜, 只需要注意一点,redis的持久化,使用AOF即可,淘汰策略使用noeviction 不淘汰任何数据,当内存不足时,新增操作会报错

reids的配置:

# 持久化设置为aof
appendonly yes
# 内存淘汰策略设置为 noeviction
maxmemory-policy noeviction

编译二进制文件

#下载代码 
git clone https://github.com/bitleak/lmstfy.git
# 进入项目目录
cd lmstfy
# 使用make编译项目,当前目录会生成一个二进制文件,文件所在目录./_build
make```  
### lmstfy的配置文件  
```Host = "127.0.0.1"
Port = 7776
AdminHost = "127.0.0.1" # optional, default to localhost
AdminPort = 7778
#LogDir = "/var/log/lmstfy"
LogLevel = "info"
#LogFormat = "text" # Use LogFormat="json" if wants to print the log with json format
EnableAccessLog = true

# default params
#TTLSecond = 24 * 60 * 60 // 1 day
#DelaySecond = 0
#TriesNum = 1
#TTRSecond = 2 * 60 // 2 minutes
#TimeoutSecond = 0  // means non-blocking

# basic auth accounts for the admin api
[Accounts]
test_user = "change.me"

[AdminRedis]  # redis used to store admin data, eg. tokens
Addr = "localhost:6379"
# Password = foobared

[Pool]
[Pool.default]
Addr = "localhost:6379"
# Password = foobared
# DB = 0
#MigrateTo = "migrate" # When migration is enabled, all PUBLISH will go to `migrate` pool. and `default` will be drained
#[Pool.migrate]
#Addr = "localhost:6389"

#[Pool.mysentinel]
# Addr = "localhost:16379,localhost:6380,localhost:6381"
# MasterName = "mymaster"
# Password = foobared

启动lmsfty

_build/lmstfy-server -c config/demo-conf.toml

以上步骤lmsfty服务器端就启动完成了,那客户端怎么使用呢?

lmstfy客户端

申请token

imstfy使用namespace做业务隔离,每个业务一个[token] ,需要向服务端申请,如果服务端开启了basic auth accounts验证,调用接口的时候 需要传入配置文件的账号,例如上面的配置,调用的时候需要传入”change.me“

例子:namespace为kb-test,申请[token]

curl --location 'http://127.0.0.1:7778/token/kb-test' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--header 'Authorization: Basic dGVzdF91c2VyOmNoYW5nZS5tZQ==' \
--data-urlencode 'description=test'

postman调用

结果 :

{
   "token": "01HQJ7EQBDZT88JH79BDE4FAYC"
}

官方目前支持的语言有 go、[Java] 、php、Rust,我这里使用 go 简单演示下如果使用。

生产者

引入github.com/bitleak/lmstfy/client这个包,代码如下:

package main

import (
"fmt"
"github.com/bitleak/lmstfy/client"
)

func main() {
c := client.NewLmstfyClient("127.0.0.1", 7776, "kb-test", "01HQJ7EQBDZT88JH79BDE4FAYC")

c.ConfigRetry(3, 50) // optional, config the client to retry when some errors happened. retry 3 times with 50ms interval
//Publish a job with ttl==forever, tries==3, delay==5s ,注意ttl设置的时间一定要大于delay
jobId, err := c.Publish("test", []byte("test"), 100, 3, 30)
if err == nil {
 fmt.Println("消息发送成功", jobId)
}
}

消费者

package main

import (
"fmt"
"github.com/bitleak/lmstfy/client"
)

func main() {
c := client.NewLmstfyClient("127.0.0.1", 7776, "kb-test", "01HQJ7EQBDZT88JH79BDE4FAYC")

c.ConfigRetry(3, 50) // optional, config the client to retry when some errors happened. retry 3 times with 50ms interval

for {
 job, err := c.Consume("test", 6, 3)
 if err != nil {
  panic(err)
 }
 if job != nil {
  fmt.Println(string(job.Data))
  //接收到消息,成功处理了, 对消息进行ack去人
  err1 := c.Ack("test", job.ID)
  if err1 == nil {
   fmt.Println("消息出来成功并且确认")
  }
 }

}
}

以上 就是 lmstfy 的介绍以及使用,简单比较轻量,其他语言,可以看 github 上的示例。

声明:本站所有内容均为自动采集而来,如有侵权,请联系删除

相关文章

Redis连环五十二问!看谁顶得住?

Redis连环五十二问!看谁顶得住?

基本 1.说说什么是Redis? Redis是一种基于键...

用 PHP 处理 10 亿行数据!

用 PHP 处理 10 亿行数据!

今天,我将带大家一起走进“挑衅十亿行“数据的世界。当然,这个事情是依据GitHub上的一个“十亿行挑衅”(1brc)运动而来,现在正在进行,如果你没有听说过,可查看Gunnar Morlings 的 1brc 存储库。https://github.com/gunnarmorling/1brc我之所以...

2024 年的最佳 PHP 框架

2024 年的最佳 PHP 框架

在本文中,我们将预测在 2024 年持续风行的最佳 PHP 框架。我们首先将看看PHP框架是什么,什么时候该斟酌应用PHP框架,以及应用PHP框架的重要长处都是什么。我还会介绍最合适初学者的 PHP 框架以及用于 Web 开发的最佳框架。什么是PHP框架?     &...

一文读懂多家厂商的大模型训练、推理、部署策略

一文读懂多家厂商的大模型训练、推理、部署策略

4 月 20 日,第 102 期源创会在武汉胜利举行。本期邀请来自武汉人工智能研讨院、华为、MindSpore、京东云、Gitee AI 的人工智能专家,环绕【大模型竞技与性能优化】主题发表演讲。接下来就一起看看本期运动的出色瞬间吧!大合影 get ✅披萨和礼物不能少!接下来进入主题演讲回想环节。可...

请立刻停止编写 Dockerfiles 并使用 docker init

请立刻停止编写 Dockerfiles 并使用 docker init

您是那种认为编写 Dockerfile 和 docker-compose.yml 文件很苦楚的人之一吗?我承认,我就是其中之一。我总是想知道我是否遵守了 Dockerfile、 docker-compose 文件的最佳编写实践,我畏惧在不知不觉中引入了安全破绽。但是现在,我不必再担忧这个问题了,感激...

服务器为什么大多用 Linux 而不是 Windows ?

服务器为什么大多用 Linux 而不是 Windows ?

前几天在知乎看到一个话题很有意思,且很有讨论意义。“服务器为什么大多用 Linux”,除了开源、好用等原因,回答也代表了各种不同人需求和看法,摘取一些分享给大家,也欢迎留言讨论。来自知乎好友“熊大你又骗俺”的回答首先在20年前,windows server+iis+asp+access 的方案,还是...