Java 21 虚拟线程:重新定义Java并发编程
Java 21作为LTS版本,带来了许多激动人心的新特性,其中最引人注目的当属虚拟线程(Virtual Threads)。这一特性不仅改变了Java的并发编程模式,也让Java在高并发场景下的表现更加出色。本文将深入探讨虚拟线程的原理、与Go协程的对比,以及在实际应用中的使用方法。
虚拟线程的诞生背景
传统线程模型的局限性
在Java中,传统的线程(Platform Thread)与操作系统线程一一对应,这种模型存在以下问题:
- 内存开销大:每个线程需要约2MB的栈空间
- 创建成本高:线程的创建和销毁需要系统调用
- 数量限制:受限于操作系统的线程数量限制
- 上下文切换开销:频繁的线程切换影响性能
// 传统线程创建方式的局限性演示
public class TraditionalThreadLimitation {
public static void main(String[] args) {
int threadCount = 0;
try {
while (true) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.start();
threadCount++;
if (threadCount % 100 == 0) {
System.out.println("Created threads: " + threadCount);
}
}
} catch (OutOfMemoryError e) {
System.out.println("Maximum threads created: " + threadCount);
// 通常在几千个线程后就会出现OOM
}
}
}
虚拟线程的原理解析
核心设计理念
虚拟线程采用了M:N的线程模型,即M个虚拟线程映射到N个平台线程上,其中M >> N。这种设计的核心思想是:
- 轻量级:虚拟线程的内存占用极小(约几KB)
- 用户态调度:由JVM而非操作系统负责调度
- 协作式调度:在阻塞时自动让出执行权
实现机制
// 虚拟线程的底层实现原理(简化版)
public class VirtualThreadPrinciple {
// 载体线程池 - 真正执行任务的平台线程
private static final ForkJoinPool CARRIER_THREAD_POOL =
ForkJoinPool.commonPool();
// 虚拟线程的状态
enum VirtualThreadState {
NEW, RUNNABLE, BLOCKED, WAITING, TERMINATED
}
static class VirtualThread {
private VirtualThreadState state = VirtualThreadState.NEW;
private Runnable task;
private Object parkReason; // 阻塞原因
// 当前执行的载体线程
private Thread carrierThread;
public void start() {
state = VirtualThreadState.RUNNABLE;
// 提交到载体线程池执行
CARRIER_THREAD_POOL.submit(this::run);
}
private void run() {
carrierThread = Thread.currentThread();
try {
state = VirtualThreadState.RUNNABLE;
task.run();
} finally {
state = VirtualThreadState.TERMINATED;
carrierThread = null;
}
}
// 虚拟线程阻塞时的处理
public void park(Object reason) {
this.parkReason = reason;
this.state = VirtualThreadState.BLOCKED;
// 释放载体线程,让其他虚拟线程使用
yieldCarrierThread();
}
private void yieldCarrierThread() {
// 实际实现中会使用Continuation机制
// 保存当前执行状态并让出载体线程
}
}
}
Continuation机制
虚拟线程的核心是Continuation(延续)机制,它允许暂停和恢复线程的执行:
// Continuation机制示例(概念性代码)
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;
public class ContinuationExample {
private static final ContinuationScope SCOPE = new ContinuationScope("VirtualThread");
public static void demonstrateContinuation() {
Continuation continuation = new Continuation(SCOPE, () -> {
System.out.println("开始执行任务");
// 模拟阻塞操作
System.out.println("遇到阻塞,暂停执行");
Continuation.yield(SCOPE); // 暂停执行
System.out.println("恢复执行");
System.out.println("任务完成");
});
// 第一次运行
continuation.run();
System.out.println("Continuation已暂停");
// 恢复执行
continuation.run();
System.out.println("Continuation已完成");
}
}
虚拟线程的使用方法
基础用法
import java.time.Duration;
import java.util.concurrent.Executors;
public class VirtualThreadBasics {
public static void main(String[] args) throws InterruptedException {
// 方法1:直接创建虚拟线程
Thread vThread1 = Thread.ofVirtual().start(() -> {
System.out.println("虚拟线程1正在运行: " + Thread.currentThread());
});
// 方法2:使用构建器创建
Thread vThread2 = Thread.ofVirtual()
.name("my-virtual-thread")
.start(() -> {
System.out.println("命名虚拟线程: " + Thread.currentThread());
});
// 方法3:使用虚拟线程执行器
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
System.out.println("执行器中的虚拟线程: " + Thread.currentThread());
return "任务完成";
});
}
vThread1.join();
vThread2.join();
}
}
高并发场景示例
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class HighConcurrencyExample {
private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient();
public static void main(String[] args) {
comparePerformance();
}
// 使用虚拟线程进行大量HTTP请求
public static void virtualThreadHttpRequests() {
Instant start = Instant.now();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 创建10000个HTTP请求任务
var futures = IntStream.range(0, 10_000)
.mapToObj(i -> executor.submit(() -> makeHttpRequest(i)))
.toList();
// 等待所有请求完成
futures.forEach(future -> {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
Duration duration = Duration.between(start, Instant.now());
System.out.println("虚拟线程完成10000个请求耗时: " + duration.toMillis() + "ms");
}
// 使用传统线程池进行对比
public static void platformThreadHttpRequests() {
Instant start = Instant.now();
try (var executor = Executors.newFixedThreadPool(200)) {
var futures = IntStream.range(0, 10_000)
.mapToObj(i -> executor.submit(() -> makeHttpRequest(i)))
.toList();
futures.forEach(future -> {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
Duration duration = Duration.between(start, Instant.now());
System.out.println("平台线程完成10000个请求耗时: " + duration.toMillis() + "ms");
}
private static String makeHttpRequest(int id) {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://httpbin.org/delay/1"))
.timeout(Duration.ofSeconds(10))
.build();
HttpResponse<String> response = HTTP_CLIENT.send(request,
HttpResponse.BodyHandlers.ofString());
return "Request " + id + " completed with status: " + response.statusCode();
} catch (Exception e) {
return "Request " + id + " failed: " + e.getMessage();
}
}
public static void comparePerformance() {
System.out.println("开始性能对比测试...");
// 虚拟线程测试
virtualThreadHttpRequests();
// 平台线程测试
platformThreadHttpRequests();
}
}
数据库连接池优化
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class DatabaseVirtualThreadExample {
private static final String DB_URL = "jdbc:h2:mem:testdb";
private static final String USER = "sa";
private static final String PASS = "";
public static void main(String[] args) {
initDatabase();
demonstrateVirtualThreadsWithDatabase();
}
private static void initDatabase() {
try (Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
Statement stmt = conn.createStatement()) {
stmt.execute("CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(50))");
// 插入测试数据
for (int i = 1; i <= 1000; i++) {
stmt.execute("INSERT INTO users VALUES (" + i + ", 'User" + i + "')");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void demonstrateVirtualThreadsWithDatabase() {
// 使用虚拟线程处理大量数据库查询
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var futures = IntStream.range(1, 1001)
.mapToObj(userId -> executor.submit(() -> queryUser(userId)))
.toList();
System.out.println("提交了1000个数据库查询任务");
// 收集结果
long successCount = futures.stream()
.mapToLong(future -> {
try {
return future.get() != null ? 1 : 0;
} catch (Exception e) {
return 0;
}
})
.sum();
System.out.println("成功查询用户数量: " + successCount);
}
}
private static User queryUser(int userId) {
try (Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
Statement stmt = conn.createStatement()) {
// 模拟慢查询
Thread.sleep(100);
ResultSet rs = stmt.executeQuery("SELECT * FROM users WHERE id = " + userId);
if (rs.next()) {
return new User(rs.getInt("id"), rs.getString("name"));
}
} catch (Exception e) {
System.err.println("查询用户失败: " + userId + ", " + e.getMessage());
}
return null;
}
record User(int id, String name) {}
}
Java虚拟线程 vs Go协程对比
设计哲学对比
| 特性 | Java虚拟线程 | Go协程(Goroutine) |
|---|---|---|
| 设计目标 | 兼容现有Java代码 | 从设计之初就支持协程 |
| 语法支持 | 使用现有Thread API | 原生语法支持(go关键字) |
| 调度器 | JVM调度器 | Go运行时调度器 |
| 内存开销 | ~几KB | ~2KB |
| 启动方式 | Thread.ofVirtual().start() | go func() |
性能对比示例
Java虚拟线程版本:
public class JavaVirtualThreadPerformance {
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var futures = IntStream.range(0, 1_000_000)
.mapToObj(i -> executor.submit(() -> {
// 模拟计算密集型任务
int sum = 0;
for (int j = 0; j < 1000; j++) {
sum += j;
}
return sum;
}))
.toList();
// 等待所有任务完成
futures.forEach(future -> {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
long endTime = System.currentTimeMillis();
System.out.println("Java虚拟线程执行100万个任务耗时: " + (endTime - startTime) + "ms");
}
}
Go协程对比版本:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
startTime := time.Now()
var wg sync.WaitGroup
const numTasks = 1_000_000
for i := 0; i < numTasks; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 模拟计算密集型任务
sum := 0
for j := 0; j < 1000; j++ {
sum += j
}
}()
}
wg.Wait()
duration := time.Since(startTime)
fmt.Printf("Go协程执行100万个任务耗时: %v\n", duration)
}
详细特性对比
内存模型:
// Java虚拟线程 - 栈的动态扩展
public class StackGrowthExample {
public static void demonstrateStackGrowth() {
Thread.ofVirtual().start(() -> {
recursiveCall(0);
}).join();
}
private static void recursiveCall(int depth) {
if (depth < 10000) {
byte[] localArray = new byte[1024]; // 1KB局部变量
recursiveCall(depth + 1);
}
System.out.println("递归深度: " + depth);
}
}
通信机制对比:
// Java - 使用阻塞队列进行通信
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class JavaCommunication {
public static void demonstrateChannelLikeCommunication() {
BlockingQueue<String> channel = new LinkedBlockingQueue<>();
// 生产者虚拟线程
Thread.ofVirtual().start(() -> {
for (int i = 0; i < 10; i++) {
try {
channel.put("Message " + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 消费者虚拟线程
Thread.ofVirtual().start(() -> {
try {
while (true) {
String message = channel.take();
System.out.println("收到: " + message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
实际应用场景
1. Web服务器优化
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
public class VirtualThreadWebServer {
public static void main(String[] args) throws IOException {
// 创建HTTP服务器
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
// 使用虚拟线程执行器处理请求
server.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
// 设置请求处理器
server.createContext("/api/slow", new SlowApiHandler());
server.createContext("/api/fast", new FastApiHandler());
server.start();
System.out.println("服务器启动在端口8080,使用虚拟线程处理请求");
}
static class SlowApiHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
try {
// 模拟慢速API调用(如数据库查询、外部服务调用)
Thread.sleep(2000);
String response = "慢速API响应 - 线程: " + Thread.currentThread();
exchange.sendResponseHeaders(200, response.length());
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
exchange.sendResponseHeaders(500, 0);
}
}
}
static class FastApiHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
String response = "快速API响应 - 线程: " + Thread.currentThread();
exchange.sendResponseHeaders(200, response.length());
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes());
}
}
}
}
2. 批量数据处理
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class BatchProcessingWithVirtualThreads {
public static void main(String[] args) {
processBatchData();
}
public static void processBatchData() {
List<DataItem> dataItems = generateTestData(10000);
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
long startTime = System.currentTimeMillis();
// 使用虚拟线程并行处理数据
List<CompletableFuture<ProcessedData>> futures = dataItems.stream()
.map(item -> CompletableFuture.supplyAsync(() -> processItem(item), executor))
.toList();
// 等待所有任务完成并收集结果
List<ProcessedData> results = futures.stream()
.map(CompletableFuture::join)
.toList();
long endTime = System.currentTimeMillis();
System.out.println("处理了 " + results.size() + " 个数据项");
System.out.println("总耗时: " + (endTime - startTime) + "ms");
System.out.println("平均每项耗时: " + (endTime - startTime) / (double) results.size() + "ms");
}
}
private static List<DataItem> generateTestData(int count) {
return IntStream.range(0, count)
.mapToObj(i -> new DataItem(i, "Data-" + i, Math.random() * 100))
.toList();
}
private static ProcessedData processItem(DataItem item) {
try {
// 模拟复杂的数据处理(如调用外部API、数据库查询等)
Thread.sleep((long) (Math.random() * 100 + 50)); // 50-150ms的随机延迟
// 执行数据转换
double processedValue = item.value() * 1.1 + Math.sin(item.id());
String processedName = item.name().toUpperCase();
return new ProcessedData(item.id(), processedName, processedValue,
Thread.currentThread().toString());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("处理被中断", e);
}
}
record DataItem(int id, String name, double value) {}
record ProcessedData(int id, String name, double value, String processingThread) {}
}
3. 微服务间通信优化
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
public class MicroserviceClient {
private final HttpClient httpClient;
private final List<String> serviceUrls;
public MicroserviceClient() {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
this.serviceUrls = List.of(
"http://user-service:8081",
"http://order-service:8082",
"http://inventory-service:8083",
"http://payment-service:8084"
);
}
// 使用虚拟线程并行调用多个微服务
public CompletableFuture<AggregatedResponse> aggregateServiceCalls(String userId) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 并行调用所有相关服务
CompletableFuture<UserInfo> userFuture = CompletableFuture
.supplyAsync(() -> getUserInfo(userId), executor);
CompletableFuture<List<Order>> ordersFuture = CompletableFuture
.supplyAsync(() -> getUserOrders(userId), executor);
CompletableFuture<InventoryStatus> inventoryFuture = CompletableFuture
.supplyAsync(() -> getInventoryStatus(userId), executor);
CompletableFuture<PaymentInfo> paymentFuture = CompletableFuture
.supplyAsync(() -> getPaymentInfo(userId), executor);
// 组合所有结果
return CompletableFuture.allOf(userFuture, ordersFuture, inventoryFuture, paymentFuture)
.thenApply(v -> new AggregatedResponse(
userFuture.join(),
ordersFuture.join(),
inventoryFuture.join(),
paymentFuture.join()
));
}
}
private UserInfo getUserInfo(String userId) {
return callService("/users/" + userId, UserInfo.class);
}
private List<Order> getUserOrders(String userId) {
return callService("/orders/user/" + userId, List.class);
}
private InventoryStatus getInventoryStatus(String userId) {
return callService("/inventory/user/" + userId, InventoryStatus.class);
}
private PaymentInfo getPaymentInfo(String userId) {
return callService("/payment/user/" + userId, PaymentInfo.class);
}
private <T> T callService(String endpoint, Class<T> responseType) {
try {
// 这里简化了服务发现逻辑
String serviceUrl = serviceUrls.get(0) + endpoint;
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(serviceUrl))
.timeout(Duration.ofSeconds(10))
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
// 这里应该使用JSON库进行反序列化
// 为了示例简化,直接返回模拟对象
return createMockResponse(responseType);
} catch (Exception e) {
throw new RuntimeException("服务调用失败: " + endpoint, e);
}
}
@SuppressWarnings("unchecked")
private <T> T createMockResponse(Class<T> responseType) {
// 模拟响应对象创建
if (responseType == UserInfo.class) {
return (T) new UserInfo("用户123", "user@example.com");
} else if (responseType == List.class) {
return (T) List.of(new Order("订单1", 100.0), new Order("订单2", 200.0));
} else if (responseType == InventoryStatus.class) {
return (T) new InventoryStatus(50, "充足");
} else if (responseType == PaymentInfo.class) {
return (T) new PaymentInfo("信用卡", "有效");
}
return null;
}
// 数据传输对象
record UserInfo(String name, String email) {}
record Order(String id, double amount) {}
record InventoryStatus(int quantity, String status) {}
record PaymentInfo(String method, String status) {}
record AggregatedResponse(UserInfo user, List<Order> orders,
InventoryStatus inventory, PaymentInfo payment) {}
}
最佳实践和注意事项
1. 适用场景
适合使用虚拟线程的场景:
public class VirtualThreadBestPractices {
// ✅ IO密集型任务
public void ioIntensiveTask() {
Thread.ofVirtual().start(() -> {
try {
// 网络请求
// 数据库查询
// 文件操作
Thread.sleep(1000); // 模拟IO等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// ❌ CPU密集型任务不适合
public void cpuIntensiveTask() {
// 不推荐:CPU密集型任务会占用载体线程
Thread.ofVirtual().start(() -> {
for (long i = 0; i < 1_000_000_000L; i++) {
// 密集计算
}
});
}
}
2. 避免的陷阱
public class VirtualThreadPitfalls {
// ❌ 避免使用synchronized
private final Object lock = new Object();
public void badSynchronizedUsage() {
Thread.ofVirtual().start(() -> {
synchronized (lock) { // 会导致虚拟线程固定到载体线程
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
// ✅ 使用ReentrantLock替代
private final ReentrantLock reentrantLock = new ReentrantLock();
public void goodLockUsage() {
Thread.ofVirtual().start(() -> {
reentrantLock.lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
reentrantLock.unlock();
}
});
}
}
3. 监控和调试
import jdk.jfr.Configuration;
import jdk.jfr.Recording;
public class VirtualThreadMonitoring {
public static void enableVirtualThreadMonitoring() {
// 启用JFR记录虚拟线程事件
try {
Configuration config = Configuration.getConfiguration("profile");
Recording recording = new Recording(config);
recording.setName("VirtualThreadRecording");
recording.start();
// 执行业务逻辑
runBusinessLogic();
recording.stop();
recording.dump(Paths.get("virtual-threads.jfr"));
} catch (Exception e) {
e.printStackTrace();
}
}
private static void runBusinessLogic() {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var futures = IntStream.range(0, 1000)
.mapToObj(i -> executor.submit(() -> {
try {
Thread.sleep(100);
return "Task " + i + " completed";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Task " + i + " interrupted";
}
}))
.toList();
futures.forEach(future -> {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
// 虚拟线程状态监控
public static void monitorVirtualThreads() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
Thread.ofVirtual().name("监控线程").start(() -> {
while (true) {
try {
long threadCount = threadBean.getThreadCount();
long peakThreadCount = threadBean.getPeakThreadCount();
System.out.printf("当前线程数: %d, 峰值线程数: %d%n",
threadCount, peakThreadCount);
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
性能测试和基准测试
吞吐量对比测试
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class VirtualThreadBenchmark {
public static void main(String[] args) throws InterruptedException {
System.out.println("开始虚拟线程性能测试...");
// 测试不同任务数量下的性能
int[] taskCounts = {1_000, 10_000, 100_000, 1_000_000};
for (int taskCount : taskCounts) {
System.out.println("\n=== 测试 " + taskCount + " 个任务 ===");
benchmarkVirtualThreads(taskCount);
benchmarkPlatformThreads(taskCount);
}
}
private static void benchmarkVirtualThreads(int taskCount) throws InterruptedException {
long startTime = System.nanoTime();
CountDownLatch latch = new CountDownLatch(taskCount);
AtomicLong completedTasks = new AtomicLong(0);
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < taskCount; i++) {
executor.submit(() -> {
try {
// 模拟IO操作
Thread.sleep(10);
completedTasks.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
latch.await();
}
long endTime = System.nanoTime();
long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
System.out.printf("虚拟线程: %d个任务, 耗时: %dms, 完成: %d, 吞吐量: %.2f任务/秒%n",
taskCount, duration, completedTasks.get(),
(completedTasks.get() * 1000.0) / duration);
}
private static void benchmarkPlatformThreads(int taskCount) throws InterruptedException {
long startTime = System.nanoTime();
CountDownLatch latch = new CountDownLatch(taskCount);
AtomicLong completedTasks = new AtomicLong(0);
// 限制平台线程数量,避免资源耗尽
int threadPoolSize = Math.min(taskCount, 200);
try (var executor = Executors.newFixedThreadPool(threadPoolSize)) {
for (int i = 0; i < taskCount; i++) {
executor.submit(() -> {
try {
Thread.sleep(10);
completedTasks.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
latch.await();
}
long endTime = System.nanoTime();
long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
System.out.printf("平台线程: %d个任务, 耗时: %dms, 完成: %d, 吞吐量: %.2f任务/秒%n",
taskCount, duration, completedTasks.get(),
(completedTasks.get() * 1000.0) / duration);
}
}
内存使用对比
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
public class MemoryUsageBenchmark {
public static void main(String[] args) throws InterruptedException {
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
System.out.println("内存使用对比测试");
System.out.println("==================");
// 测试虚拟线程内存使用
testVirtualThreadMemoryUsage(memoryBean);
System.gc();
Thread.sleep(2000);
// 测试平台线程内存使用
testPlatformThreadMemoryUsage(memoryBean);
}
private static void testVirtualThreadMemoryUsage(MemoryMXBean memoryBean)
throws InterruptedException {
System.out.println("\n--- 虚拟线程内存测试 ---");
MemoryUsage beforeHeap = memoryBean.getHeapMemoryUsage();
System.out.printf("测试前堆内存: %d MB%n", beforeHeap.getUsed() / 1024 / 1024);
List<Thread> threads = new ArrayList<>();
// 创建大量虚拟线程
for (int i = 0; i < 10_000; i++) {
Thread vThread = Thread.ofVirtual().start(() -> {
try {
Thread.sleep(60_000); // 保持活跃状态
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
threads.add(vThread);
}
Thread.sleep(1000); // 等待线程启动
MemoryUsage afterHeap = memoryBean.getHeapMemoryUsage();
System.out.printf("创建10000个虚拟线程后堆内存: %d MB%n",
afterHeap.getUsed() / 1024 / 1024);
System.out.printf("虚拟线程内存增长: %d MB%n",
(afterHeap.getUsed() - beforeHeap.getUsed()) / 1024 / 1024);
// 中断所有线程
threads.forEach(Thread::interrupt);
}
private static void testPlatformThreadMemoryUsage(MemoryMXBean memoryBean)
throws InterruptedException {
System.out.println("\n--- 平台线程内存测试 ---");
MemoryUsage beforeHeap = memoryBean.getHeapMemoryUsage();
System.out.printf("测试前堆内存: %d MB%n", beforeHeap.getUsed() / 1024 / 1024);
List<Thread> threads = new ArrayList<>();
try {
// 尝试创建平台线程(会因为内存限制而失败)
for (int i = 0; i < 1_000; i++) { // 只创建1000个,避免OOM
Thread pThread = new Thread(() -> {
try {
Thread.sleep(60_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
pThread.start();
threads.add(pThread);
}
Thread.sleep(1000);
MemoryUsage afterHeap = memoryBean.getHeapMemoryUsage();
System.out.printf("创建1000个平台线程后堆内存: %d MB%n",
afterHeap.getUsed() / 1024 / 1024);
System.out.printf("平台线程内存增长: %d MB%n",
(afterHeap.getUsed() - beforeHeap.getUsed()) / 1024 / 1024);
} catch (OutOfMemoryError e) {
System.out.println("平台线程创建失败:内存不足");
} finally {
threads.forEach(Thread::interrupt);
}
}
}
实际项目迁移指南
Spring Boot集成示例
// Spring Boot配置类
@Configuration
@EnableAsync
public class VirtualThreadConfig {
@Bean("virtualThreadExecutor")
public Executor virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
@Bean
public AsyncConfigurer asyncConfigurer() {
return new AsyncConfigurer() {
@Override
public Executor getAsyncExecutor() {
return virtualThreadExecutor();
}
};
}
}
// 使用虚拟线程的服务类
@Service
public class UserService {
@Async("virtualThreadExecutor")
public CompletableFuture<User> findUserAsync(Long userId) {
// 模拟数据库查询
try {
Thread.sleep(100);
return CompletableFuture.completedFuture(
new User(userId, "User" + userId, "user" + userId + "@example.com")
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
}
@Async("virtualThreadExecutor")
public CompletableFuture<Void> sendNotificationAsync(User user) {
// 模拟发送通知
try {
Thread.sleep(200);
System.out.println("通知已发送给用户: " + user.name());
return CompletableFuture.completedFuture(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
}
record User(Long id, String name, String email) {}
}
结论与展望
Java 21的虚拟线程代表了Java并发编程的重大进步,它解决了传统线程模型的诸多限制,为高并发应用开发提供了新的可能性。
主要优势总结:
- 轻量级:内存占用极小,可以创建数百万个虚拟线程
- 高并发:特别适合IO密集型应用,大幅提升吞吐量
- 简单易用:保持现有Thread API,学习成本低
- 向后兼容:现有代码无需修改即可受益
与Go协程的对比:
- Java虚拟线程:更好的生态集成,企业级特性
- Go协程:语言原生支持,语法更简洁
适用场景:
- 推荐使用:Web服务器、微服务、批量数据处理、IO密集型任务
- 谨慎使用:CPU密集型任务、需要synchronized的场景
未来展望:
虚拟线程将成为Java高并发编程的标准选择,随着生态系统的完善,将会有更多框架和库原生支持虚拟线程。对于Java开发者而言,现在正是学习和应用虚拟线程的最佳时机。
通过合理使用虚拟线程,我们可以构建出更高效、更可扩展的Java应用程序,让Java在云原生和高并发领域重新焕发活力。
