前段时间我不得不做一些非常相似的事情,所以这是一个改编。
(在线查看实际应用)
这实际上是完全相同的基本需求,但在我的情况下,键是一个字符串,更重要的是,键集不会无限增长,所以在这里我必须添加一个“清理调度程序”。除此之外,它基本上是相同的代码,所以我希望我在改编过程中没有丢失任何严重的东西。我测试了它,看起来它的工作原理。但是,它比其他解决方案更长,也许更复杂...
基本理念:
-
MessageTask
将消息包装到 中,并在完成时通知队列Runnable
-
ConvoQueue
:阻止消息队列,用于对话。充当前排队,保证所需的顺序。特别看到这个三重奏:→ → → ...ConvoQueue.runNextIfPossible()
MessageTask.run()
ConvoQueue.complete()
-
MessageProcessor
有一个 、 和一个Map<Long, ConvoQueue>
ExecutorService
- 消息由执行器中的任何线程处理,s为每个convo提供和保证消息顺序,但不是全局的(因此,与其他一些解决方案不同,“困难”消息不会阻止其他会话的处理,并且该属性在我们的例子中至关重要 - 如果它对您来说不是那么重要,也许更简单的解决方案更好)
ConvoQueue
ExecutorService
- 清理方式(需要 1 个线程)
ScheduledExecutorService
视觉:
ConvoQueues ExecutorService's internal queue
(shared, but has at most 1 MessageTask per convo)
Convo 1 ########
Convo 2 #####
Convo 3 ####### Thread 1
Convo 4 } → #### → {
Convo 5 ### Thread 2
Convo 6 #########
Convo 7 #####
(Convo 4 is about to be deleted)
下面所有类(可以直接执行):MessageProcessorTest
// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;
public class MessageProcessor {
private static final long CLEANUP_PERIOD_S = 10;
private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
private final ExecutorService executorService;
public MessageProcessor(int nbThreads) {
executorService = Executors.newFixedThreadPool(nbThreads);
ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
}
public void addMessageToProcess(Message message) {
ConvoQueue queue = getQueue(message.getConversationId());
queue.addMessage(message);
}
private ConvoQueue getQueue(Long convoId) {
synchronized (queuesByConvo) {
return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
}
}
private void removeEmptyQueues() {
synchronized (queuesByConvo) {
queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
}
}
// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
class ConvoQueue {
private Queue<MessageTask> queue;
private MessageTask activeTask;
private ExecutorService executorService;
ConvoQueue(ExecutorService executorService) {
this.executorService = executorService;
this.queue = new LinkedBlockingQueue<>();
}
private void runNextIfPossible() {
synchronized(this) {
if (activeTask == null) {
activeTask = queue.poll();
if (activeTask != null) {
executorService.submit(activeTask);
}
}
}
}
void complete(MessageTask task) {
synchronized(this) {
if (task == activeTask) {
activeTask = null;
runNextIfPossible();
}
else {
throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
}
}
}
boolean isEmpty() {
return queue.isEmpty();
}
void addMessage(Message message) {
add(new MessageTask(this, message));
}
private void add(MessageTask task) {
synchronized(this) {
queue.add(task);
runNextIfPossible();
}
}
}
// MessageTask.java
public class MessageTask implements Runnable {
private ConvoQueue convoQueue;
private Message message;
MessageTask(ConvoQueue convoQueue, Message message) {
this.convoQueue = convoQueue;
this.message = message;
}
@Override
public void run() {
try {
processMessage();
}
finally {
convoQueue.complete(this);
}
}
private void processMessage() {
// Dummy processing with random delay to observe reordered messages & preserved convo order
try {
Thread.sleep((long) (50*Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
}
}
// Message.java
class Message {
private long id;
private long conversationId;
private String data;
Message(long id, long conversationId, String someData) {
this.id = id;
this.conversationId = conversationId;
this.data = someData;
}
long getConversationId() {
return conversationId;
}
String getData() {
return data;
}
public String toString() {
return "Message{" + id + "," + conversationId + "," + data + "}";
}
}
// MessageProcessorTest.java
public class MessageProcessorTest {
public static void main(String[] args) {
MessageProcessor test = new MessageProcessor(2);
for (int i=1; i<100; i++) {
test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
}
}
}
输出(对于每个 convo ID(第 2 个字段)顺序保留):
Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}
上面的测试给了我分享它的信心,但我有点担心我可能忘记了病理病例的细节。它已经在生产环境中运行多年而没有出现故障(尽管有更多的代码允许在我们需要查看发生了什么,为什么某个队列需要时间等时实时检查它 - 上面的系统本身从来没有问题,但有时是处理特定任务的问题)
编辑:点击这里在线测试。替代方案:将该要点复制到那里,然后按“编译和执行”。