灵活的倒计时程序?

我现在遇到过两次问题,即生产者线程生成N个工作项,将它们提交给一个,然后需要等到所有N个项都得到处理。ExecutorService

警告

  • N事先不知道。如果是这样,我会简单地创建一个,然后有生产者线程,直到所有工作完成。CountDownLatchawait()
  • 使用 a 是不合适的,因为尽管我的生产者线程需要阻塞(即通过调用),但无法发出所有工作的信号,从而导致生产者线程停止等待。CompletionServicetake()

我目前最喜欢的解决方案是使用整数计数器,并在提交工作项时递增该计数器,并在处理工作项时递减该计数器。在完成所有 N 个任务之后,我的生产者线程将需要等待锁定,检查是否在通知时。如果使用者线程已递减计数器并且新值为 0,则使用者线程需要通知生产者。counter == 0

有没有更好的方法来解决这个问题,或者我应该使用一个合适的结构,而不是“滚动我自己的”?java.util.concurrent

提前致谢。


答案 1

java.util.concurrent.Phaser 看起来很适合你。它计划在Java 7中发布,但最稳定的版本可以在jsr166的兴趣小组网站上找到。

相位器是一个美化的循环屏障。您可以注册N个参与方,当您准备好时,等待他们在特定阶段的推进。

关于它如何工作的简单示例:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}

答案 2

您当然可以使用 受 保护的 ,以便您的任务被包装如下:CountDownLatchAtomicReference

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

请注意,任务运行其工作,然后旋转,直到设置倒计时(即知道任务总数)。一旦设置了倒计时,他们就会倒计时并退出。这些提交如下:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

创建/提交作品后,当您知道已创建了多少个任务时

latch.set(new CountDownLatch(nTasks));
latch.get().await();

推荐