JVM中的并发工具类有哪些?如何使用这些工具提高并发性能?
困难JVM并发工具
💬0
🔥0
👍0
详细说明JVM中的并发工具类,包括CountDownLatch、CyclicBarrier、Semaphore、CompletableFuture等
参考答案
JVM提供了丰富的并发工具类,这些工具可以简化并发编程并提高程序性能:
- CountDownLatch
基本用法
public class CountDownLatchExample {
public void processData(List<String> data) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int index = i;
new Thread(() -> {
try {
processDataPart(data, index, threadCount);
} finally {
latch.countDown();
}
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("All data processed");
}
private void processDataPart(List<String> data, int start, int step) {
for (int i = start; i < data.size(); i += step) {
// 处理数据
}
}
}
特点
- 一次性使用
- 等待多个线程完成
- 不可重置
- CyclicBarrier
基本用法
public class CyclicBarrierExample {
public void processCyclicData(List<String> data) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("All threads reached barrier");
});
for (int i = 0; i < threadCount; i++) {
final int index = i;
new Thread(() -> {
try {
for (int round = 0; round < 3; round++) {
processDataRound(data, index, threadCount, round);
barrier.await(); // 等待其他线程
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
private void processDataRound(List<String> data, int start, int step, int round) {
// 处理一轮数据
}
}
特点
- 可重复使用
- 支持屏障动作
- 可以重置
- Semaphore
基本用法
public class SemaphoreExample {
private final Semaphore semaphore = new Semaphore(5); // 最多5个线程同时执行
public void processWithLimit() throws InterruptedException {
semaphore.acquire(); // 获取许可
try {
// 执行任务
processTask();
} finally {
semaphore.release(); // 释放许可
}
}
private void processTask() {
// 具体任务逻辑
}
}
高级用法
public class AdvancedSemaphoreExample {
private final Semaphore semaphore = new Semaphore(5, true); // 公平信号量
public void processWithTimeout() throws InterruptedException {
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) { // 尝试获取许可,超时1秒
try {
processTask();
} finally {
semaphore.release();
}
} else {
System.out.println("Failed to acquire semaphore");
}
}
}
- CompletableFuture
基本用法
public class CompletableFutureExample {
public CompletableFuture<String> processAsync() {
return CompletableFuture.supplyAsync(() -> {
// 异步执行任务
return "Task completed";
});
}
public void processWithChain() {
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenAccept(System.out::println)
.thenRun(() -> System.out.println("Done"));
}
}
组合多个Future
public class CompletableFutureCombinationExample {
public CompletableFuture<String> combineResults() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result2");
return future1.thenCombine(future2, (r1, r2) -> r1 + " + " + r2);
}
public CompletableFuture<Void> waitForAll() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");
return CompletableFuture.allOf(future1, future2);
}
}
- Phaser
基本用法
public class PhaserExample {
public void processWithPhases() {
Phaser phaser = new Phaser(3); // 3个线程
for (int i = 0; i < 3; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 阶段1
System.out.println("Thread " + threadId + " completed phase 1");
phaser.arriveAndAwaitAdvance();
// 阶段2
System.out.println("Thread " + threadId + " completed phase 2");
phaser.arriveAndAwaitAdvance();
// 阶段3
System.out.println("Thread " + threadId + " completed phase 3");
phaser.arriveAndDeregister();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
特点
- 支持多阶段同步
- 可以动态调整参与者数量
- 功能最强大但使用复杂
- Exchanger
基本用法
public class ExchangerExample {
public void exchangeData() {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "Data from Thread 1";
String exchanged = exchanger.exchange(data);
System.out.println("Thread 1 received: " + exchanged);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
String data = "Data from Thread 2";
String exchanged = exchanger.exchange(data);
System.out.println("Thread 2 received: " + exchanged);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
特点
- 两个线程交换数据
- 同步点
- 适合生产者-消费者模式
- 工具类选择策略
等待多个线程完成
- 一次性:CountDownLatch
- 可重复:CyclicBarrier
限制并发数量
- 简单限制:Semaphore
- 复杂控制:Phaser
异步任务处理
- 简单异步:CompletableFuture
- 复杂流程:自定义Future
数据交换
- 两个线程:Exchanger
- 多个线程:BlockingQueue
评论区 (0)
暂无评论,来发表第一条评论吧!