JVM中的并发工具类有哪些?如何使用这些工具提高并发性能?

详细说明JVM中的并发工具类,包括CountDownLatch、CyclicBarrier、Semaphore、CompletableFuture等

参考答案

JVM提供了丰富的并发工具类,这些工具可以简化并发编程并提高程序性能:

  1. CountDownLatch

基本用法

public class CountDownLatchExample {
public void processData(List<String> data) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int index = i;
new Thread(() -> {
try {
processDataPart(data, index, threadCount);
} finally {
latch.countDown();
}
}).start();
}

latch.await(); // 等待所有线程完成
System.out.println("All data processed");
}

private void processDataPart(List<String> data, int start, int step) {
for (int i = start; i < data.size(); i += step) {
// 处理数据
}
}
}

特点

  • 一次性使用
  • 等待多个线程完成
  • 不可重置
  1. CyclicBarrier

基本用法

public class CyclicBarrierExample {
public void processCyclicData(List<String> data) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("All threads reached barrier");
});

for (int i = 0; i < threadCount; i++) {
final int index = i;
new Thread(() -> {
try {
for (int round = 0; round < 3; round++) {
processDataRound(data, index, threadCount, round);
barrier.await(); // 等待其他线程
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}

private void processDataRound(List<String> data, int start, int step, int round) {
// 处理一轮数据
}
}

特点

  • 可重复使用
  • 支持屏障动作
  • 可以重置
  1. Semaphore

基本用法

public class SemaphoreExample {
private final Semaphore semaphore = new Semaphore(5); // 最多5个线程同时执行

public void processWithLimit() throws InterruptedException {
semaphore.acquire(); // 获取许可
try {
// 执行任务
processTask();
} finally {
semaphore.release(); // 释放许可
}
}

private void processTask() {
// 具体任务逻辑
}
}

高级用法

public class AdvancedSemaphoreExample {
private final Semaphore semaphore = new Semaphore(5, true); // 公平信号量

public void processWithTimeout() throws InterruptedException {
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) { // 尝试获取许可,超时1秒
try {
processTask();
} finally {
semaphore.release();
}
} else {
System.out.println("Failed to acquire semaphore");
}
}
}
  1. CompletableFuture

基本用法

public class CompletableFutureExample {
public CompletableFuture<String> processAsync() {
return CompletableFuture.supplyAsync(() -> {
// 异步执行任务
return "Task completed";
});
}

public void processWithChain() {
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenAccept(System.out::println)
.thenRun(() -> System.out.println("Done"));
}
}

组合多个Future

public class CompletableFutureCombinationExample {
public CompletableFuture<String> combineResults() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result2");

return future1.thenCombine(future2, (r1, r2) -> r1 + " + " + r2);
}

public CompletableFuture<Void> waitForAll() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");

return CompletableFuture.allOf(future1, future2);
}
}
  1. Phaser

基本用法

public class PhaserExample {
public void processWithPhases() {
Phaser phaser = new Phaser(3); // 3个线程

for (int i = 0; i < 3; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 阶段1
System.out.println("Thread " + threadId + " completed phase 1");
phaser.arriveAndAwaitAdvance();

// 阶段2
System.out.println("Thread " + threadId + " completed phase 2");
phaser.arriveAndAwaitAdvance();

// 阶段3
System.out.println("Thread " + threadId + " completed phase 3");
phaser.arriveAndDeregister();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}

特点

  • 支持多阶段同步
  • 可以动态调整参与者数量
  • 功能最强大但使用复杂
  1. Exchanger

基本用法

public class ExchangerExample {
public void exchangeData() {
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
try {
String data = "Data from Thread 1";
String exchanged = exchanger.exchange(data);
System.out.println("Thread 1 received: " + exchanged);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
String data = "Data from Thread 2";
String exchanged = exchanger.exchange(data);
System.out.println("Thread 2 received: " + exchanged);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

特点

  • 两个线程交换数据
  • 同步点
  • 适合生产者-消费者模式
  1. 工具类选择策略

等待多个线程完成

  • 一次性:CountDownLatch
  • 可重复:CyclicBarrier

限制并发数量

  • 简单限制:Semaphore
  • 复杂控制:Phaser

异步任务处理

  • 简单异步:CompletableFuture
  • 复杂流程:自定义Future

数据交换

  • 两个线程:Exchanger
  • 多个线程:BlockingQueue

评论区 (0)

暂无评论,来发表第一条评论吧!