1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.json.JSONUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Iterator; import java.util.Set;
@Slf4j @Component @SuppressWarnings("all") public class RedisDelayQueueUtil {
@Resource private RedisTemplate<String, String> redisTemplate;
public static final String REDIS_DELAY_QUEUE = "redisDelayQueue";
public void setDelayTask(Object msg, int delayTime) { long expireTime = System.currentTimeMillis() + delayTime * 1000L; Boolean addFlag = redisTemplate.opsForZSet().add(REDIS_DELAY_QUEUE, JSONUtil.toJsonStr(msg), expireTime); if (Boolean.TRUE.equals(addFlag)) { log.info("redis 延时消息创建成功: msg={}, expireTime={}", msg, DateUtil.date(expireTime)); } }
@PostConstruct public void consumeDelayTask1() { log.info("redis 延时队列扫描已启动....."); ThreadUtil.newSingleExecutor().execute(() -> { while (true) { Set<String> set = redisTemplate.opsForZSet().rangeByScore(REDIS_DELAY_QUEUE, 0, System.currentTimeMillis(), 0L, 1L); if (CollUtil.isEmpty(set)) { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } continue; } String msgStr = set.iterator().next(); Long remove = redisTemplate.opsForZSet().remove(REDIS_DELAY_QUEUE, msgStr); if (remove > 0) { log.info("redis 延时消息已成功消费:{}", msgStr); } } }); }
@Scheduled(cron = "* * * * * ?") public void consumeDelayTask2() { Set<String> set = redisTemplate.opsForZSet().rangeByScore(REDIS_DELAY_QUEUE, 0, System.currentTimeMillis()); Iterator<String> iterator = set.iterator(); while (iterator.hasNext()) { String msg = iterator.next(); Double score = redisTemplate.opsForZSet().score(REDIS_DELAY_QUEUE, msg); if (System.currentTimeMillis() > score) { System.out.println("消费了:" + msg + "消费时间为:" + DateUtil.now()); redisTemplate.opsForZSet().remove(REDIS_DELAY_QUEUE, msg); } } }
}
|