我们平时说到消息队列,一般都是指 RabbitMQ、RocketMQ、ActiveMQ 以及大数据里边的 Kafka,这些是我们比较常见的消息中间件,也是非常专业的消息中间件,作为专业的中间件,它里边提供了许多功能。
松哥之前也有两篇介绍的文章:
- 我是如何在微人事项目中提高RabbitMQ消息可靠性的?
- Spring Boot 整合 RabbitMQ,消息重复消费怎么办?
但是,当我们需要使用消息中间件的时候,并非每次都需要非常专业的消息中间件,假如我们只有一个消息队列,只有一个消费者,那就没有必要去使用上面这些专业的消息中间件,这种情况我们可以直接使用 Redis 来做消息队列。
Redis 的消息队列不是特别专业,他没有很多高级特性,适用简单的场景,如果对于消息可靠性有着极高的追求,那么不适合使用 Redis 做消息队列。
好了,我们一起来撸代码(本视频节选自松哥自制的 Spring Boot + Vue 系列视频教程):
视频地址
以下是视频笔记:
1.消息队列
Redis 做消息队列,使用它里边的 List 数据结构就可以实现,我们可以使用 lpush/rpush 操作来实现入队,然后使用 lpop/rpop 来实现出队。
回顾一下:
在客户端(例如 Java 端),我们会维护一个死循环来不停的从队列中读取消息,并处理,如果队列中有消息,则直接获取到,如果没有消息,就会陷入死循环,直到下一次有消息进入,这种死循环会造成大量的资源浪费,这个时候,我们可以使用之前讲的 blpop/brpop 。
2.延迟消息队列
延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来。
首先,如果消息是一个字符串,直接发送即可,如果是一个对象,则需要对对象进行序列化,这里我们使用 JSON 来实现序列化和反序列化。
所以,首先在项目中,添加 JSON 依赖:
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.3</version> </dependency>
|
接下来,构造一个消息对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class JavaboyMessage { private String id; private Object data;
@Override public String toString() { return "JavaboyMessage{" + "id='" + id + '\'' + ", data=" + data + '}'; }
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public Object getData() { return data; }
public void setData(Object data) { this.data = data; } }
|
接下来封装一个消息队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public class DelayMsgQueue { private Jedis jedis; private String queue;
public DelayMsgQueue(Jedis jedis, String queue) { this.jedis = jedis; this.queue = queue; }
public void queue(Object data) { JavaboyMessage msg = new JavaboyMessage(); msg.setId(UUID.randomUUID().toString()); msg.setData(data); try { String s = new ObjectMapper().writeValueAsString(msg); System.out.println("msg publish:" + new Date()); jedis.zadd(queue, System.currentTimeMillis() + 5000, s); } catch (JsonProcessingException e) { e.printStackTrace(); } }
public void loop() { while (!Thread.interrupted()) { Set<String> zrange = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1); if (zrange.isEmpty()) { try { Thread.sleep(500); } catch (InterruptedException e) { break; } continue; } String next = zrange.iterator().next(); if (jedis.zrem(queue, next) > 0) { try { JavaboyMessage msg = new ObjectMapper().readValue(next, JavaboyMessage.class); System.out.println("receive msg:" + msg); } catch (JsonProcessingException e) { e.printStackTrace(); } } } } }
|
测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class DelayMsgTest { public static void main(String[] args) { Redis redis = new Redis(); redis.execute(jedis -> { DelayMsgQueue queue = new DelayMsgQueue(jedis, "javaboy-delay-queue"); Thread producer = new Thread(){ @Override public void run() { for (int i = 0; i < 5; i++) { queue.queue("www.javaboy.org>>>>" + i); } } }; Thread consumer = new Thread(){ @Override public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { Thread.sleep(7000); consumer.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
|
就这样,我们利用 Java 代码结合 Redis 中的 zset 就非常方便的实现了延迟消息队列。
小伙伴们有没有 GET 到呢?如果感觉有收获,记得点一下右下角的在看哦