在 Java 中并行执行依赖任务
2022-09-03 06:40:54
我需要找到一种方法在java中并行执行任务(依赖和独立)。
- 任务 A 和任务 C 可以独立运行。
- 任务 B 依赖于任务 A 的输出。
我检查了java.util.concurrent Future和Fork/Join,但看起来我们无法向任务添加依赖项。
任何人都可以指点我纠正Java API。
我需要找到一种方法在java中并行执行任务(依赖和独立)。
我检查了java.util.concurrent Future和Fork/Join,但看起来我们无法向任务添加依赖项。
任何人都可以指点我纠正Java API。
在Scala中,这很容易做到,我认为使用Scala更好。以下是我从这里抽取的一个例子 http://danielwestheide.com/(The Neophyte's Guide to Scala Part 16: Where to Go From Here)这个家伙有一个很棒的博客(我不是那个家伙)
让我们带一个律师煮咖啡。要执行的任务是:
或作为一棵树:
Grind _
Coffe \
\
Heat ___\_Brew____
Water \_____Combine
/
Foam ____________/
Milk
在使用并发 API 的 java 中,这将是:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Barrista {
static class HeatWater implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Heating Water");
Thread.sleep(1000);
return "hot water";
}
}
static class GrindBeans implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Grinding Beans");
Thread.sleep(2000);
return "grinded beans";
}
}
static class Brew implements Callable<String> {
final Future<String> grindedBeans;
final Future<String> hotWater;
public Brew(Future<String> grindedBeans, Future<String> hotWater) {
this.grindedBeans = grindedBeans;
this.hotWater = hotWater;
}
@Override
public String call() throws Exception
{
System.out.println("brewing coffee with " + grindedBeans.get()
+ " and " + hotWater.get());
Thread.sleep(1000);
return "brewed coffee";
}
}
static class FrothMilk implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "some milk";
}
}
static class Combine implements Callable<String> {
public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
super();
this.frothedMilk = frothedMilk;
this.brewedCoffee = brewedCoffee;
}
final Future<String> frothedMilk;
final Future<String> brewedCoffee;
@Override
public String call() throws Exception {
Thread.sleep(1000);
System.out.println("Combining " + frothedMilk.get() + " "
+ brewedCoffee.get());
return "Final Coffee";
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));
executor.execute(heatWaterFuture);
executor.execute(grindBeans);
executor.execute(brewCoffee);
executor.execute(frothMilk);
executor.execute(combineCoffee);
try {
/**
* Warning this code is blocking !!!!!!!
*/
System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
e.printStackTrace();
} finally{
executor.shutdown();
}
}
}
确保你添加了超时,以确保你的代码不会永远等待某些事情完成,这是通过使用Future.get(long,TimeUnit)完成的,然后相应地处理失败。
然而,在scala中它要好得多,在这里它就像在博客上一样:准备一些咖啡的代码看起来像这样:
def prepareCappuccino(): Try[Cappuccino] = for {
ground <- Try(grind("arabica beans"))
water <- Try(heatWater(Water(25)))
espresso <- Try(brew(ground, water))
foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
其中,所有方法都返回一个 future(类型化 future),例如 grind 将如下所示:
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
// grinding function contents
}
对于所有实现,请查看博客,但仅此而已。你也可以很容易地集成Scala和Java。我真的建议在Scala而不是Java中做这种事情。Scala需要更少的代码,更干净和事件驱动。
具有依赖关系的任务的常规编程模型是数据流。简化的模型,其中每个任务只有一个,尽管重复,依赖关系是Actor模型。Java有很多执行组件库,但数据流很少。另请参阅:哪个-actor-model-library-framework-for-java,java-pattern-for-nested-callbacks