SpringBoot与Redis Pub/Sub模型整合实现订阅发布
|Word count:403|Reading time:1min|Post View:
简单的介绍下Redis中的Pub/Sub模型的用法,前几年还用过,自从Redis Stream出来以后,这个应该就很少用。
Redis Pub/Sub优缺点
先来看看优缺点吧,从以下就可以看出,这个模型可使用的场景实在是非常非常少了。
优点
- 支持发布 / 订阅,支持多组生产者、消费者处理消息
缺点
- 消费者下线,数据会丢失
- 不支持数据持久化,Redis 宕机,数据也会丢失
- 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失
实现方式
实现方式比较简单,只需要创建于给监听器,实现MessageListener
接口即可。
创建消费者监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Log4j2 @Configuration public class RedisListener implements MessageListener {
@Value("${spring.redis.channel-topic:default}") private String topic;
@Autowired private RestHighLevelClient restHighLevelClient;
public RedisListener(RestHighLevelClient restHighLevelClient) { this.restHighLevelClient = restHighLevelClient; }
@Override public void onMessage(Message message, byte[] pattern) { String topic = new String(pattern); String context = new String(message.getBody()); log.info("topic:{},context:{}", topic, context); }
@Bean RedisMessageListenerContainer redisMessageListenerContainer( RedisConnectionFactory redisConnectionFactory, RedisListener redisListener) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); redisMessageListenerContainer.addMessageListener(redisListener, new ChannelTopic(topic)); return redisMessageListenerContainer; } }
|
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Service public class CollectorServiceImpl implements CollectorService { private RedisTemplate<String, String> redisTemplate; private ObjectMapper objectMapper;
@Value("${spring.redis.channel-topic:default}") private String topic;
public CollectorServiceImpl(ObjectMapper objectMapper, RedisTemplate<String, String> redisTemplate) { this.objectMapper = objectMapper; this.redisTemplate = redisTemplate; }
@Override public void sendToMsg(BuriedPointDTO buriedPointDTO) { try { redisTemplate.convertAndSend(topic, objectMapper.writeValueAsString(buriedPointDTO)); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } }
|
关于Redis Pub/Sub和Redis Stream,可参考这篇:把Redis当作队列来用,真的合适吗?-redis作为队列 (51cto.com)