线程池,用于并行处理消息,但保持会话中的顺序

2022-09-03 12:48:53

我需要并行处理消息,但保留具有相同会话 ID 的消息的处理顺序。

示例
让我们定义一条消息,如下所示:

class Message {
    Message(long id, long conversationId, String someData) {...}
}

假设消息按以下顺序到达:
消息(1, 1, “a1”),消息(2, 2, “a2”),消息(3, 1, “b1”),消息(4, 2, “b2”)

我需要在消息 1 之后处理消息 3,因为消息 1 和 3 具有相同的会话 ID(同样,消息 4 应该在 2 之后以相同的原因进行处理)。
我不关心例如1和2之间的相对顺序,因为它们具有不同的对话ID。

我想尽可能多地重用java ThreadPoolExecutor的功能,以避免在我的代码中手动替换死线程等。

更新:可能的“会话 ID”的数量不受限制,并且对会话没有时间限制。(我个人不认为这是一个问题,因为我可以从confeageId到worker Number进行简单的映射,例如speageId% totalWorkers)。

更新 2:具有多个队列的解决方案存在一个问题,其中队列编号由例如“index = Objects.hash(conversationId) % total”确定:如果处理某些消息需要很长时间,则具有相同“索引”但具有不同“conversationId”的所有消息都将等待,即使其他线程可用于处理它。也就是说,我相信使用单个智能阻止队列的解决方案会更好,但这只是一种意见,我对任何好的解决方案都持开放态度。

你看到这个问题的优雅解决方案了吗?


答案 1

前段时间我不得不做一些非常相似的事情,所以这是一个改编。

(在线查看实际应用)

这实际上是完全相同的基本需求,但在我的情况下,键是一个字符串,更重要的是,键集不会无限增长,所以在这里我必须添加一个“清理调度程序”。除此之外,它基本上是相同的代码,所以我希望我在改编过程中没有丢失任何严重的东西。我测试了它,看起来它的工作原理。但是,它比其他解决方案更长,也许更复杂...

基本理念:

  • MessageTask将消息包装到 中,并在完成时通知队列Runnable
  • ConvoQueue:阻止消息队列,用于对话。充当前排队,保证所需的顺序。特别看到这个三重奏:→ → → ...ConvoQueue.runNextIfPossible()MessageTask.run()ConvoQueue.complete()
  • MessageProcessor有一个 、 和一个Map<Long, ConvoQueue>ExecutorService
  • 消息由执行器中的任何线程处理,s为每个convo提供和保证消息顺序,但不是全局的(因此,与其他一些解决方案不同,“困难”消息不会阻止其他会话的处理,并且该属性在我们的例子中至关重要 - 如果它对您来说不是那么重要,也许更简单的解决方案更好)ConvoQueueExecutorService
  • 清理方式(需要 1 个线程)ScheduledExecutorService

视觉:

   ConvoQueues              ExecutorService's internal queue
                            (shared, but has at most 1 MessageTask per convo)
Convo 1   ########   
Convo 2      #####   
Convo 3    #######                        Thread 1
Convo 4              } →    ####    → {
Convo 5        ###                        Thread 2
Convo 6  #########   
Convo 7      #####   

(Convo 4 is about to be deleted)

下面所有类(可以直接执行):MessageProcessorTest

// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

public class MessageProcessor {

    private static final long CLEANUP_PERIOD_S = 10;
    private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
    private final ExecutorService executorService;

    public MessageProcessor(int nbThreads) {
        executorService = Executors.newFixedThreadPool(nbThreads);
        ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
        cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
    }

    public void addMessageToProcess(Message message) {
        ConvoQueue queue = getQueue(message.getConversationId());
        queue.addMessage(message);
    }

    private ConvoQueue getQueue(Long convoId) {
        synchronized (queuesByConvo) {
            return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
        }
    }

    private void removeEmptyQueues() {
        synchronized (queuesByConvo) {
            queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
        }
    }

}


// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

class ConvoQueue {

    private Queue<MessageTask> queue;
    private MessageTask activeTask;
    private ExecutorService executorService;

    ConvoQueue(ExecutorService executorService) {
        this.executorService = executorService;
        this.queue = new LinkedBlockingQueue<>();
    }

    private void runNextIfPossible() {
        synchronized(this) {
            if (activeTask == null) {
                activeTask = queue.poll();
                if (activeTask != null) {
                    executorService.submit(activeTask);
                }
            }
        }
    }

    void complete(MessageTask task) {
        synchronized(this) {
            if (task == activeTask) {
                activeTask = null;
                runNextIfPossible();
            }
            else {
                throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
            }
        }
    }

    boolean isEmpty() {
        return queue.isEmpty();
    }

    void addMessage(Message message) {
        add(new MessageTask(this, message));
    }

    private void add(MessageTask task) {
        synchronized(this) {
            queue.add(task);
            runNextIfPossible();
        }
    }

}

// MessageTask.java
public class MessageTask implements Runnable {

    private ConvoQueue convoQueue;
    private Message message;

    MessageTask(ConvoQueue convoQueue, Message message) {
        this.convoQueue = convoQueue;
        this.message = message;
    }

    @Override
    public void run() {
        try {
            processMessage();
        }
        finally {
            convoQueue.complete(this);
        }
    }

    private void processMessage() {
        // Dummy processing with random delay to observe reordered messages & preserved convo order
        try {
            Thread.sleep((long) (50*Math.random()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(message);
    }

}

// Message.java
class Message {

    private long id;
    private long conversationId;
    private String data;

    Message(long id, long conversationId, String someData) {
        this.id = id;
        this.conversationId = conversationId;
        this.data = someData;
    }

    long getConversationId() {
        return conversationId;
    }

    String getData() {
        return data;
    }

    public String toString() {
        return "Message{" + id + "," + conversationId + "," + data + "}";
    }
}

// MessageProcessorTest.java
public class MessageProcessorTest {
    public static void main(String[] args) {
        MessageProcessor test = new MessageProcessor(2);
        for (int i=1; i<100; i++) {
            test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
        }
    }
}

输出(对于每个 convo ID(第 2 个字段)顺序保留):

Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}

上面的测试给了我分享它的信心,但我有点担心我可能忘记了病理病例的细节。它已经在生产环境中运行多年而没有出现故障(尽管有更多的代码允许在我们需要查看发生了什么,为什么某个队列需要时间等时实时检查它 - 上面的系统本身从来没有问题,但有时是处理特定任务的问题)

编辑:点击这里在线测试。替代方案:将该要点复制到那里,然后按“编译和执行”。


答案 2

不确定您希望如何处理邮件。为方便起见,每条消息的类型都是 Runnable 类型,这是执行的地点。

所有这一切的解决方案是让一些服从平行的 。使用模运算来计算需要将传入消息分发到哪个位置。显然,对于相同的会话 ID,它是相同的,因此对于相同的会话 ID,您可以并行处理但顺序处理。不能保证具有不同会话ID的消息将始终并行执行(总而言之,您至少受到系统中物理内核数量的限制)。ExecutorExecutorServiceExecutorExecutor

public class MessageExecutor {

    public interface Message extends Runnable {

        long getId();

        long getConversationId();

        String getMessage();

    }

    private static class Executor implements Runnable {

        private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Message message) {
            messages.add(message);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Message message = messages.take();
                    message.run();
                } catch (Exception e) {
                    System.err.println(e.getMessage());
                }
            }
        }
    }

    private final Executor[] executors;
    private final ExecutorService executorService;

    public MessageExecutor(int poolCount) {
        executorService = Executors.newFixedThreadPool(poolCount);
        executors = new Executor[poolCount];

        IntStream.range(0, poolCount).forEach(i -> {
            Executor executor = new Executor();
            executorService.submit(executor);
            executors[i] = executor;
        });
    }

    public void submit(Message message) {
        final int executorNr = Objects.hash(message.getConversationId()) % executors.length;
        executors[executorNr].schedule(message);
    }

    public void stop() {
        Arrays.stream(executors).forEach(Executor::stop);
        executorService.shutdown();
    }
}

然后,您可以使用池 ammount 启动消息执行程序并向其提交消息。

public static void main(String[] args) {
    MessageExecutor messageExecutor = new MessageExecutor(Runtime.getRuntime().availableProcessors());
    messageExecutor.submit(new Message() {
        @Override
        public long getId() {
            return 1;
        }

        @Override
        public long getConversationId() {
            return 1;
        }

        @Override
        public String getMessage() {
            return "abc1";
        }

        @Override
        public void run() {
            System.out.println(this.getMessage());
        }
    });
    messageExecutor.submit(new Message() {
        @Override
        public long getId() {
            return 1;
        }

        @Override
        public long getConversationId() {
            return 2;
        }

        @Override
        public String getMessage() {
            return "abc2";
        }

        @Override
        public void run() {
            System.out.println(this.getMessage());
        }
    });
    messageExecutor.stop();
}

当我运行池计数为 2 并提交消息量时:

Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]

当运行相同数量的消息且池计数为 3 时:

Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]

消息在 的:)池中很好地分布。Executor

编辑:'s正在捕获所有异常,以确保当一条消息失败时它不会中断。Executorrun()