SpringBoot与Redis Stream整合实现消息队列
|Word Count:1.1k|Reading Time:5mins|Post Views:
最近需要做一个简单的埋点工作,考虑到发送数据比较密集,每次都将数据实时写入那肯定不合理,于是就考虑利用消息队列做一下缓冲,避免过多的写入造成对系统的影响,这种场景拍脑门一想就是利用kafka或者rabbitmq来实现,但目前现状是申请网络策略非常麻烦,为了一个小功能再引入一个新的中间件也比较浪费,于是就想着利用redis stream
来实现了。
环境要求
具体实现
依赖
确认项目中依赖有spring-boot-starter-data-redis
,如果没有的话请添加如下:
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
|
application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: redis: database: 0 host: 127.0.0.1 port: 6379 password: passwd123 lettuce: pool: max-idle: 8 min-idle: 1 max-active: 8 max-wait: -1 timeout: 1000 channel-topic: buried_point:dev
|
创建监听器
创建一个监听器,实现StreamListener接口,用来消费生产的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Log4j2 @Component public class BuriedPointListener implements StreamListener<String, MapRecord<String, String, String>> {
private RedisTemplate<String, String> redisTemplate;
private ObjectMapper objectMapper;
@Value("${spring.redis.channel-topic:default}") private String topic;
public BuriedPointListener(RedisTemplate<String, String> redisTemplate, ObjectMapper objectMapper) { this.redisTemplate = redisTemplate; }
@Override public void onMessage(MapRecord<String, String, String> message) { String stream = message.getStream(); RecordId id = message.getId(); Map<String, String> map = message.getValue(); log.debug("[手动] group:[group-a] 接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map); redisTemplate.opsForStream().acknowledge(Objects.requireNonNull(stream), "group-a", id.getValue()); redisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue()); } }
|
创建RedisStreamConfiguration配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Configuration public class RedisStreamConfiguration { @Autowired private RedisConnectionFactory redisConnectionFactory;
@Autowired private BuriedPointListener buriedPointListener;
@Value("${spring.redis.channel-topic:default}") private String topic;
@Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() { AtomicInteger index = new AtomicInteger(1); int processors = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), r -> { Thread thread = new Thread(r); thread.setName("async-stream-consumer-" + index.getAndIncrement()); thread.setDaemon(true); return thread; });
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() .batchSize(3) .executor(executor) .pollTimeout(Duration.ofSeconds(3)) .errorHandler(new StreamErrorHandler()) .build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"), StreamOffset.create(topic, ReadOffset.lastConsumed()), buriedPointListener);
return streamMessageListenerContainer; } }
|
创建消费组
1 2 3
| 127.0.0.1:6379> xgroup create buried_point:dev group-a 0 OK
|
创建生产者
只需要调用 redisTemplate.opsForStream().add(topic, data);
方法往里写数据就行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Log4j2 @Service public class CollectorServiceImpl implements CollectorService { private RedisTemplate<String, String> redisTemplate; private ObjectMapper objectMapper;
@Value("${spring.redis.channel-topic:default}") private String topic;
public CollectorServiceImpl(ObjectMapper objectMapper, RedisTemplate<String, String> redisTemplate) { this.objectMapper = objectMapper; this.redisTemplate = redisTemplate; }
@Override public void sendToMsg(BuriedPointDTO buriedPointDTO) { try { Map<String, Object> data = new HashMap<>(); data.put("url", buriedPointDTO.getUrl()); data.put("title", buriedPointDTO.getTitle()); data.put("uid", buriedPointDTO.getUserId()); data.put("userName", buriedPointDTO.getUserName()); data.put("accessTime", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(buriedPointDTO.getAccessTime())); redisTemplate.opsForStream().add(topic, data); log.info(objectMapper.writeValueAsString(buriedPointDTO)); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } }
|
此时,启动项目,当redis stream
中有数据的时候就会开始消费数据。