简单的介绍下Redis中的Pub/Sub模型的用法,前几年还用过,自从Redis Stream出来以后,这个应该就很少用。

Redis Pub/Sub优缺点

先来看看优缺点吧,从以下就可以看出,这个模型可使用的场景实在是非常非常少了。

优点

  1. 支持发布 / 订阅,支持多组生产者、消费者处理消息

缺点

  1. 消费者下线,数据会丢失
  2. 不支持数据持久化,Redis 宕机,数据也会丢失
  3. 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失

实现方式

实现方式比较简单,只需要创建于给监听器,实现MessageListener接口即可。

创建消费者监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Log4j2
@Configuration
public class RedisListener implements MessageListener {

@Value("${spring.redis.channel-topic:default}")
private String topic;

@Autowired
private RestHighLevelClient restHighLevelClient;

public RedisListener(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}

@Override
public void onMessage(Message message, byte[] pattern) {
String topic = new String(pattern);
String context = new String(message.getBody());
log.info("topic:{},context:{}", topic, context);
}

@Bean
RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory, RedisListener redisListener) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
//订阅topic - subscribe
redisMessageListenerContainer.addMessageListener(redisListener, new ChannelTopic(topic));
return redisMessageListenerContainer;
}
}

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class CollectorServiceImpl implements CollectorService {
private RedisTemplate<String, String> redisTemplate;
private ObjectMapper objectMapper;

@Value("${spring.redis.channel-topic:default}")
private String topic;

public CollectorServiceImpl(ObjectMapper objectMapper, RedisTemplate<String, String> redisTemplate) {
this.objectMapper = objectMapper;
this.redisTemplate = redisTemplate;
}

@Override
public void sendToMsg(BuriedPointDTO buriedPointDTO) {
try {
redisTemplate.convertAndSend(topic, objectMapper.writeValueAsString(buriedPointDTO));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

关于Redis Pub/Sub和Redis Stream,可参考这篇:把Redis当作队列来用,真的合适吗?-redis作为队列 (51cto.com)