Java 21 虚拟线程:重新定义Java并发编程

Java 21作为LTS版本,带来了许多激动人心的新特性,其中最引人注目的当属虚拟线程(Virtual Threads)。这一特性不仅改变了Java的并发编程模式,也让Java在高并发场景下的表现更加出色。本文将深入探讨虚拟线程的原理、与Go协程的对比,以及在实际应用中的使用方法。

虚拟线程的诞生背景

传统线程模型的局限性

在Java中,传统的线程(Platform Thread)与操作系统线程一一对应,这种模型存在以下问题:

  1. 内存开销大:每个线程需要约2MB的栈空间
  2. 创建成本高:线程的创建和销毁需要系统调用
  3. 数量限制:受限于操作系统的线程数量限制
  4. 上下文切换开销:频繁的线程切换影响性能
// 传统线程创建方式的局限性演示
public class TraditionalThreadLimitation {
    public static void main(String[] args) {
        int threadCount = 0;
        try {
            while (true) {
                Thread thread = new Thread(() -> {
                    try {
                        Thread.sleep(Long.MAX_VALUE);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
                thread.start();
                threadCount++;
                if (threadCount % 100 == 0) {
                    System.out.println("Created threads: " + threadCount);
                }
            }
        } catch (OutOfMemoryError e) {
            System.out.println("Maximum threads created: " + threadCount);
            // 通常在几千个线程后就会出现OOM
        }
    }
}

虚拟线程的原理解析

核心设计理念

虚拟线程采用了M:N的线程模型,即M个虚拟线程映射到N个平台线程上,其中M >> N。这种设计的核心思想是:

  1. 轻量级:虚拟线程的内存占用极小(约几KB)
  2. 用户态调度:由JVM而非操作系统负责调度
  3. 协作式调度:在阻塞时自动让出执行权

实现机制

// 虚拟线程的底层实现原理(简化版)
public class VirtualThreadPrinciple {
    
    // 载体线程池 - 真正执行任务的平台线程
    private static final ForkJoinPool CARRIER_THREAD_POOL = 
        ForkJoinPool.commonPool();
    
    // 虚拟线程的状态
    enum VirtualThreadState {
        NEW, RUNNABLE, BLOCKED, WAITING, TERMINATED
    }
    
    static class VirtualThread {
        private VirtualThreadState state = VirtualThreadState.NEW;
        private Runnable task;
        private Object parkReason; // 阻塞原因
        
        // 当前执行的载体线程
        private Thread carrierThread;
        
        public void start() {
            state = VirtualThreadState.RUNNABLE;
            // 提交到载体线程池执行
            CARRIER_THREAD_POOL.submit(this::run);
        }
        
        private void run() {
            carrierThread = Thread.currentThread();
            try {
                state = VirtualThreadState.RUNNABLE;
                task.run();
            } finally {
                state = VirtualThreadState.TERMINATED;
                carrierThread = null;
            }
        }
        
        // 虚拟线程阻塞时的处理
        public void park(Object reason) {
            this.parkReason = reason;
            this.state = VirtualThreadState.BLOCKED;
            // 释放载体线程,让其他虚拟线程使用
            yieldCarrierThread();
        }
        
        private void yieldCarrierThread() {
            // 实际实现中会使用Continuation机制
            // 保存当前执行状态并让出载体线程
        }
    }
}

Continuation机制

虚拟线程的核心是Continuation(延续)机制,它允许暂停和恢复线程的执行:

// Continuation机制示例(概念性代码)
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;

public class ContinuationExample {
    
    private static final ContinuationScope SCOPE = new ContinuationScope("VirtualThread");
    
    public static void demonstrateContinuation() {
        Continuation continuation = new Continuation(SCOPE, () -> {
            System.out.println("开始执行任务");
            
            // 模拟阻塞操作
            System.out.println("遇到阻塞,暂停执行");
            Continuation.yield(SCOPE); // 暂停执行
            
            System.out.println("恢复执行");
            System.out.println("任务完成");
        });
        
        // 第一次运行
        continuation.run();
        System.out.println("Continuation已暂停");
        
        // 恢复执行
        continuation.run();
        System.out.println("Continuation已完成");
    }
}

虚拟线程的使用方法

基础用法

import java.time.Duration;
import java.util.concurrent.Executors;

public class VirtualThreadBasics {
    
    public static void main(String[] args) throws InterruptedException {
        
        // 方法1:直接创建虚拟线程
        Thread vThread1 = Thread.ofVirtual().start(() -> {
            System.out.println("虚拟线程1正在运行: " + Thread.currentThread());
        });
        
        // 方法2:使用构建器创建
        Thread vThread2 = Thread.ofVirtual()
                .name("my-virtual-thread")
                .start(() -> {
                    System.out.println("命名虚拟线程: " + Thread.currentThread());
                });
        
        // 方法3:使用虚拟线程执行器
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            executor.submit(() -> {
                System.out.println("执行器中的虚拟线程: " + Thread.currentThread());
                return "任务完成";
            });
        }
        
        vThread1.join();
        vThread2.join();
    }
}

高并发场景示例

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class HighConcurrencyExample {
    
    private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient();
    
    public static void main(String[] args) {
        comparePerformance();
    }
    
    // 使用虚拟线程进行大量HTTP请求
    public static void virtualThreadHttpRequests() {
        Instant start = Instant.now();
        
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // 创建10000个HTTP请求任务
            var futures = IntStream.range(0, 10_000)
                .mapToObj(i -> executor.submit(() -> makeHttpRequest(i)))
                .toList();
            
            // 等待所有请求完成
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        
        Duration duration = Duration.between(start, Instant.now());
        System.out.println("虚拟线程完成10000个请求耗时: " + duration.toMillis() + "ms");
    }
    
    // 使用传统线程池进行对比
    public static void platformThreadHttpRequests() {
        Instant start = Instant.now();
        
        try (var executor = Executors.newFixedThreadPool(200)) {
            var futures = IntStream.range(0, 10_000)
                .mapToObj(i -> executor.submit(() -> makeHttpRequest(i)))
                .toList();
            
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        
        Duration duration = Duration.between(start, Instant.now());
        System.out.println("平台线程完成10000个请求耗时: " + duration.toMillis() + "ms");
    }
    
    private static String makeHttpRequest(int id) {
        try {
            HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create("https://httpbin.org/delay/1"))
                .timeout(Duration.ofSeconds(10))
                .build();
            
            HttpResponse<String> response = HTTP_CLIENT.send(request, 
                HttpResponse.BodyHandlers.ofString());
            
            return "Request " + id + " completed with status: " + response.statusCode();
        } catch (Exception e) {
            return "Request " + id + " failed: " + e.getMessage();
        }
    }
    
    public static void comparePerformance() {
        System.out.println("开始性能对比测试...");
        
        // 虚拟线程测试
        virtualThreadHttpRequests();
        
        // 平台线程测试
        platformThreadHttpRequests();
    }
}

数据库连接池优化

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class DatabaseVirtualThreadExample {
    
    private static final String DB_URL = "jdbc:h2:mem:testdb";
    private static final String USER = "sa";
    private static final String PASS = "";
    
    public static void main(String[] args) {
        initDatabase();
        demonstrateVirtualThreadsWithDatabase();
    }
    
    private static void initDatabase() {
        try (Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
             Statement stmt = conn.createStatement()) {
            
            stmt.execute("CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(50))");
            
            // 插入测试数据
            for (int i = 1; i <= 1000; i++) {
                stmt.execute("INSERT INTO users VALUES (" + i + ", 'User" + i + "')");
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void demonstrateVirtualThreadsWithDatabase() {
        // 使用虚拟线程处理大量数据库查询
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            
            var futures = IntStream.range(1, 1001)
                .mapToObj(userId -> executor.submit(() -> queryUser(userId)))
                .toList();
            
            System.out.println("提交了1000个数据库查询任务");
            
            // 收集结果
            long successCount = futures.stream()
                .mapToLong(future -> {
                    try {
                        return future.get() != null ? 1 : 0;
                    } catch (Exception e) {
                        return 0;
                    }
                })
                .sum();
            
            System.out.println("成功查询用户数量: " + successCount);
        }
    }
    
    private static User queryUser(int userId) {
        try (Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
             Statement stmt = conn.createStatement()) {
            
            // 模拟慢查询
            Thread.sleep(100);
            
            ResultSet rs = stmt.executeQuery("SELECT * FROM users WHERE id = " + userId);
            if (rs.next()) {
                return new User(rs.getInt("id"), rs.getString("name"));
            }
            
        } catch (Exception e) {
            System.err.println("查询用户失败: " + userId + ", " + e.getMessage());
        }
        return null;
    }
    
    record User(int id, String name) {}
}

Java虚拟线程 vs Go协程对比

设计哲学对比

特性Java虚拟线程Go协程(Goroutine)
设计目标兼容现有Java代码从设计之初就支持协程
语法支持使用现有Thread API原生语法支持(go关键字)
调度器JVM调度器Go运行时调度器
内存开销~几KB~2KB
启动方式Thread.ofVirtual().start()go func()

性能对比示例

Java虚拟线程版本:

public class JavaVirtualThreadPerformance {
    
    public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            var futures = IntStream.range(0, 1_000_000)
                .mapToObj(i -> executor.submit(() -> {
                    // 模拟计算密集型任务
                    int sum = 0;
                    for (int j = 0; j < 1000; j++) {
                        sum += j;
                    }
                    return sum;
                }))
                .toList();
            
            // 等待所有任务完成
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        
        long endTime = System.currentTimeMillis();
        System.out.println("Java虚拟线程执行100万个任务耗时: " + (endTime - startTime) + "ms");
    }
}

Go协程对比版本:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    startTime := time.Now()
    
    var wg sync.WaitGroup
    const numTasks = 1_000_000
    
    for i := 0; i < numTasks; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            // 模拟计算密集型任务
            sum := 0
            for j := 0; j < 1000; j++ {
                sum += j
            }
        }()
    }
    
    wg.Wait()
    
    duration := time.Since(startTime)
    fmt.Printf("Go协程执行100万个任务耗时: %v\n", duration)
}

详细特性对比

内存模型:

// Java虚拟线程 - 栈的动态扩展
public class StackGrowthExample {
    
    public static void demonstrateStackGrowth() {
        Thread.ofVirtual().start(() -> {
            recursiveCall(0);
        }).join();
    }
    
    private static void recursiveCall(int depth) {
        if (depth < 10000) {
            byte[] localArray = new byte[1024]; // 1KB局部变量
            recursiveCall(depth + 1);
        }
        System.out.println("递归深度: " + depth);
    }
}

通信机制对比:

// Java - 使用阻塞队列进行通信
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class JavaCommunication {
    
    public static void demonstrateChannelLikeCommunication() {
        BlockingQueue<String> channel = new LinkedBlockingQueue<>();
        
        // 生产者虚拟线程
        Thread.ofVirtual().start(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    channel.put("Message " + i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        
        // 消费者虚拟线程
        Thread.ofVirtual().start(() -> {
            try {
                while (true) {
                    String message = channel.take();
                    System.out.println("收到: " + message);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
}

实际应用场景

1. Web服务器优化

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

public class VirtualThreadWebServer {
    
    public static void main(String[] args) throws IOException {
        // 创建HTTP服务器
        HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
        
        // 使用虚拟线程执行器处理请求
        server.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        
        // 设置请求处理器
        server.createContext("/api/slow", new SlowApiHandler());
        server.createContext("/api/fast", new FastApiHandler());
        
        server.start();
        System.out.println("服务器启动在端口8080,使用虚拟线程处理请求");
    }
    
    static class SlowApiHandler implements HttpHandler {
        @Override
        public void handle(HttpExchange exchange) throws IOException {
            try {
                // 模拟慢速API调用(如数据库查询、外部服务调用)
                Thread.sleep(2000);
                
                String response = "慢速API响应 - 线程: " + Thread.currentThread();
                exchange.sendResponseHeaders(200, response.length());
                
                try (OutputStream os = exchange.getResponseBody()) {
                    os.write(response.getBytes());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                exchange.sendResponseHeaders(500, 0);
            }
        }
    }
    
    static class FastApiHandler implements HttpHandler {
        @Override
        public void handle(HttpExchange exchange) throws IOException {
            String response = "快速API响应 - 线程: " + Thread.currentThread();
            exchange.sendResponseHeaders(200, response.length());
            
            try (OutputStream os = exchange.getResponseBody()) {
                os.write(response.getBytes());
            }
        }
    }
}

2. 批量数据处理

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class BatchProcessingWithVirtualThreads {
    
    public static void main(String[] args) {
        processBatchData();
    }
    
    public static void processBatchData() {
        List<DataItem> dataItems = generateTestData(10000);
        
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            
            long startTime = System.currentTimeMillis();
            
            // 使用虚拟线程并行处理数据
            List<CompletableFuture<ProcessedData>> futures = dataItems.stream()
                .map(item -> CompletableFuture.supplyAsync(() -> processItem(item), executor))
                .toList();
            
            // 等待所有任务完成并收集结果
            List<ProcessedData> results = futures.stream()
                .map(CompletableFuture::join)
                .toList();
            
            long endTime = System.currentTimeMillis();
            
            System.out.println("处理了 " + results.size() + " 个数据项");
            System.out.println("总耗时: " + (endTime - startTime) + "ms");
            System.out.println("平均每项耗时: " + (endTime - startTime) / (double) results.size() + "ms");
        }
    }
    
    private static List<DataItem> generateTestData(int count) {
        return IntStream.range(0, count)
            .mapToObj(i -> new DataItem(i, "Data-" + i, Math.random() * 100))
            .toList();
    }
    
    private static ProcessedData processItem(DataItem item) {
        try {
            // 模拟复杂的数据处理(如调用外部API、数据库查询等)
            Thread.sleep((long) (Math.random() * 100 + 50)); // 50-150ms的随机延迟
            
            // 执行数据转换
            double processedValue = item.value() * 1.1 + Math.sin(item.id());
            String processedName = item.name().toUpperCase();
            
            return new ProcessedData(item.id(), processedName, processedValue, 
                Thread.currentThread().toString());
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("处理被中断", e);
        }
    }
    
    record DataItem(int id, String name, double value) {}
    record ProcessedData(int id, String name, double value, String processingThread) {}
}

3. 微服务间通信优化

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

public class MicroserviceClient {
    
    private final HttpClient httpClient;
    private final List<String> serviceUrls;
    
    public MicroserviceClient() {
        this.httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(5))
            .build();
        
        this.serviceUrls = List.of(
            "http://user-service:8081",
            "http://order-service:8082",
            "http://inventory-service:8083",
            "http://payment-service:8084"
        );
    }
    
    // 使用虚拟线程并行调用多个微服务
    public CompletableFuture<AggregatedResponse> aggregateServiceCalls(String userId) {
        
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            
            // 并行调用所有相关服务
            CompletableFuture<UserInfo> userFuture = CompletableFuture
                .supplyAsync(() -> getUserInfo(userId), executor);
            
            CompletableFuture<List<Order>> ordersFuture = CompletableFuture
                .supplyAsync(() -> getUserOrders(userId), executor);
            
            CompletableFuture<InventoryStatus> inventoryFuture = CompletableFuture
                .supplyAsync(() -> getInventoryStatus(userId), executor);
            
            CompletableFuture<PaymentInfo> paymentFuture = CompletableFuture
                .supplyAsync(() -> getPaymentInfo(userId), executor);
            
            // 组合所有结果
            return CompletableFuture.allOf(userFuture, ordersFuture, inventoryFuture, paymentFuture)
                .thenApply(v -> new AggregatedResponse(
                    userFuture.join(),
                    ordersFuture.join(),
                    inventoryFuture.join(),
                    paymentFuture.join()
                ));
        }
    }
    
    private UserInfo getUserInfo(String userId) {
        return callService("/users/" + userId, UserInfo.class);
    }
    
    private List<Order> getUserOrders(String userId) {
        return callService("/orders/user/" + userId, List.class);
    }
    
    private InventoryStatus getInventoryStatus(String userId) {
        return callService("/inventory/user/" + userId, InventoryStatus.class);
    }
    
    private PaymentInfo getPaymentInfo(String userId) {
        return callService("/payment/user/" + userId, PaymentInfo.class);
    }
    
    private <T> T callService(String endpoint, Class<T> responseType) {
        try {
            // 这里简化了服务发现逻辑
            String serviceUrl = serviceUrls.get(0) + endpoint;
            
            HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(serviceUrl))
                .timeout(Duration.ofSeconds(10))
                .build();
            
            HttpResponse<String> response = httpClient.send(request, 
                HttpResponse.BodyHandlers.ofString());
            
            // 这里应该使用JSON库进行反序列化
            // 为了示例简化,直接返回模拟对象
            return createMockResponse(responseType);
            
        } catch (Exception e) {
            throw new RuntimeException("服务调用失败: " + endpoint, e);
        }
    }
    
    @SuppressWarnings("unchecked")
    private <T> T createMockResponse(Class<T> responseType) {
        // 模拟响应对象创建
        if (responseType == UserInfo.class) {
            return (T) new UserInfo("用户123", "user@example.com");
        } else if (responseType == List.class) {
            return (T) List.of(new Order("订单1", 100.0), new Order("订单2", 200.0));
        } else if (responseType == InventoryStatus.class) {
            return (T) new InventoryStatus(50, "充足");
        } else if (responseType == PaymentInfo.class) {
            return (T) new PaymentInfo("信用卡", "有效");
        }
        return null;
    }
    
    // 数据传输对象
    record UserInfo(String name, String email) {}
    record Order(String id, double amount) {}
    record InventoryStatus(int quantity, String status) {}
    record PaymentInfo(String method, String status) {}
    record AggregatedResponse(UserInfo user, List<Order> orders, 
                            InventoryStatus inventory, PaymentInfo payment) {}
}

最佳实践和注意事项

1. 适用场景

适合使用虚拟线程的场景:

public class VirtualThreadBestPractices {
    
    // ✅ IO密集型任务
    public void ioIntensiveTask() {
        Thread.ofVirtual().start(() -> {
            try {
                // 网络请求
                // 数据库查询
                // 文件操作
                Thread.sleep(1000); // 模拟IO等待
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
    
    // ❌ CPU密集型任务不适合
    public void cpuIntensiveTask() {
        // 不推荐:CPU密集型任务会占用载体线程
        Thread.ofVirtual().start(() -> {
            for (long i = 0; i < 1_000_000_000L; i++) {
                // 密集计算
            }
        });
    }
}

2. 避免的陷阱

public class VirtualThreadPitfalls {
    
    // ❌ 避免使用synchronized
    private final Object lock = new Object();
    
    public void badSynchronizedUsage() {
        Thread.ofVirtual().start(() -> {
            synchronized (lock) { // 会导致虚拟线程固定到载体线程
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
    }
    
    // ✅ 使用ReentrantLock替代
    private final ReentrantLock reentrantLock = new ReentrantLock();
    
    public void goodLockUsage() {
        Thread.ofVirtual().start(() -> {
            reentrantLock.lock();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                reentrantLock.unlock();
            }
        });
    }
}

3. 监控和调试

import jdk.jfr.Configuration;
import jdk.jfr.Recording;

public class VirtualThreadMonitoring {
    
    public static void enableVirtualThreadMonitoring() {
        // 启用JFR记录虚拟线程事件
        try {
            Configuration config = Configuration.getConfiguration("profile");
            Recording recording = new Recording(config);
            recording.setName("VirtualThreadRecording");
            recording.start();
            
            // 执行业务逻辑
            runBusinessLogic();
            
            recording.stop();
            recording.dump(Paths.get("virtual-threads.jfr"));
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private static void runBusinessLogic() {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            var futures = IntStream.range(0, 1000)
                .mapToObj(i -> executor.submit(() -> {
                    try {
                        Thread.sleep(100);
                        return "Task " + i + " completed";
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return "Task " + i + " interrupted";
                    }
                }))
                .toList();
            
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    // 虚拟线程状态监控
    public static void monitorVirtualThreads() {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        
        Thread.ofVirtual().name("监控线程").start(() -> {
            while (true) {
                try {
                    long threadCount = threadBean.getThreadCount();
                    long peakThreadCount = threadBean.getPeakThreadCount();
                    
                    System.out.printf("当前线程数: %d, 峰值线程数: %d%n", 
                        threadCount, peakThreadCount);
                    
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }
}

性能测试和基准测试

吞吐量对比测试

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class VirtualThreadBenchmark {
    
    public static void main(String[] args) throws InterruptedException {
        System.out.println("开始虚拟线程性能测试...");
        
        // 测试不同任务数量下的性能
        int[] taskCounts = {1_000, 10_000, 100_000, 1_000_000};
        
        for (int taskCount : taskCounts) {
            System.out.println("\n=== 测试 " + taskCount + " 个任务 ===");
            
            benchmarkVirtualThreads(taskCount);
            benchmarkPlatformThreads(taskCount);
        }
    }
    
    private static void benchmarkVirtualThreads(int taskCount) throws InterruptedException {
        long startTime = System.nanoTime();
        CountDownLatch latch = new CountDownLatch(taskCount);
        AtomicLong completedTasks = new AtomicLong(0);
        
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < taskCount; i++) {
                executor.submit(() -> {
                    try {
                        // 模拟IO操作
                        Thread.sleep(10);
                        completedTasks.incrementAndGet();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        latch.countDown();
                    }
                });
            }
            
            latch.await();
        }
        
        long endTime = System.nanoTime();
        long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
        
        System.out.printf("虚拟线程: %d个任务, 耗时: %dms, 完成: %d, 吞吐量: %.2f任务/秒%n",
            taskCount, duration, completedTasks.get(), 
            (completedTasks.get() * 1000.0) / duration);
    }
    
    private static void benchmarkPlatformThreads(int taskCount) throws InterruptedException {
        long startTime = System.nanoTime();
        CountDownLatch latch = new CountDownLatch(taskCount);
        AtomicLong completedTasks = new AtomicLong(0);
        
        // 限制平台线程数量,避免资源耗尽
        int threadPoolSize = Math.min(taskCount, 200);
        
        try (var executor = Executors.newFixedThreadPool(threadPoolSize)) {
            for (int i = 0; i < taskCount; i++) {
                executor.submit(() -> {
                    try {
                        Thread.sleep(10);
                        completedTasks.incrementAndGet();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        latch.countDown();
                    }
                });
            }
            
            latch.await();
        }
        
        long endTime = System.nanoTime();
        long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
        
        System.out.printf("平台线程: %d个任务, 耗时: %dms, 完成: %d, 吞吐量: %.2f任务/秒%n",
            taskCount, duration, completedTasks.get(), 
            (completedTasks.get() * 1000.0) / duration);
    }
}

内存使用对比

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;

public class MemoryUsageBenchmark {
    
    public static void main(String[] args) throws InterruptedException {
        MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
        
        System.out.println("内存使用对比测试");
        System.out.println("==================");
        
        // 测试虚拟线程内存使用
        testVirtualThreadMemoryUsage(memoryBean);
        
        System.gc();
        Thread.sleep(2000);
        
        // 测试平台线程内存使用
        testPlatformThreadMemoryUsage(memoryBean);
    }
    
    private static void testVirtualThreadMemoryUsage(MemoryMXBean memoryBean) 
            throws InterruptedException {
        System.out.println("\n--- 虚拟线程内存测试 ---");
        
        MemoryUsage beforeHeap = memoryBean.getHeapMemoryUsage();
        System.out.printf("测试前堆内存: %d MB%n", beforeHeap.getUsed() / 1024 / 1024);
        
        List<Thread> threads = new ArrayList<>();
        
        // 创建大量虚拟线程
        for (int i = 0; i < 10_000; i++) {
            Thread vThread = Thread.ofVirtual().start(() -> {
                try {
                    Thread.sleep(60_000); // 保持活跃状态
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            threads.add(vThread);
        }
        
        Thread.sleep(1000); // 等待线程启动
        
        MemoryUsage afterHeap = memoryBean.getHeapMemoryUsage();
        System.out.printf("创建10000个虚拟线程后堆内存: %d MB%n", 
            afterHeap.getUsed() / 1024 / 1024);
        System.out.printf("虚拟线程内存增长: %d MB%n", 
            (afterHeap.getUsed() - beforeHeap.getUsed()) / 1024 / 1024);
        
        // 中断所有线程
        threads.forEach(Thread::interrupt);
    }
    
    private static void testPlatformThreadMemoryUsage(MemoryMXBean memoryBean) 
            throws InterruptedException {
        System.out.println("\n--- 平台线程内存测试 ---");
        
        MemoryUsage beforeHeap = memoryBean.getHeapMemoryUsage();
        System.out.printf("测试前堆内存: %d MB%n", beforeHeap.getUsed() / 1024 / 1024);
        
        List<Thread> threads = new ArrayList<>();
        
        try {
            // 尝试创建平台线程(会因为内存限制而失败)
            for (int i = 0; i < 1_000; i++) { // 只创建1000个,避免OOM
                Thread pThread = new Thread(() -> {
                    try {
                        Thread.sleep(60_000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
                pThread.start();
                threads.add(pThread);
            }
            
            Thread.sleep(1000);
            
            MemoryUsage afterHeap = memoryBean.getHeapMemoryUsage();
            System.out.printf("创建1000个平台线程后堆内存: %d MB%n", 
                afterHeap.getUsed() / 1024 / 1024);
            System.out.printf("平台线程内存增长: %d MB%n", 
                (afterHeap.getUsed() - beforeHeap.getUsed()) / 1024 / 1024);
            
        } catch (OutOfMemoryError e) {
            System.out.println("平台线程创建失败:内存不足");
        } finally {
            threads.forEach(Thread::interrupt);
        }
    }
}

实际项目迁移指南

Spring Boot集成示例

// Spring Boot配置类
@Configuration
@EnableAsync
public class VirtualThreadConfig {
    
    @Bean("virtualThreadExecutor")
    public Executor virtualThreadExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }
    
    @Bean
    public AsyncConfigurer asyncConfigurer() {
        return new AsyncConfigurer() {
            @Override
            public Executor getAsyncExecutor() {
                return virtualThreadExecutor();
            }
        };
    }
}

// 使用虚拟线程的服务类
@Service
public class UserService {
    
    @Async("virtualThreadExecutor")
    public CompletableFuture<User> findUserAsync(Long userId) {
        // 模拟数据库查询
        try {
            Thread.sleep(100);
            return CompletableFuture.completedFuture(
                new User(userId, "User" + userId, "user" + userId + "@example.com")
            );
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return CompletableFuture.failedFuture(e);
        }
    }
    
    @Async("virtualThreadExecutor")
    public CompletableFuture<Void> sendNotificationAsync(User user) {
        // 模拟发送通知
        try {
            Thread.sleep(200);
            System.out.println("通知已发送给用户: " + user.name());
            return CompletableFuture.completedFuture(null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return CompletableFuture.failedFuture(e);
        }
    }
    
    record User(Long id, String name, String email) {}
}

结论与展望

Java 21的虚拟线程代表了Java并发编程的重大进步,它解决了传统线程模型的诸多限制,为高并发应用开发提供了新的可能性。

主要优势总结:

  1. 轻量级:内存占用极小,可以创建数百万个虚拟线程
  2. 高并发:特别适合IO密集型应用,大幅提升吞吐量
  3. 简单易用:保持现有Thread API,学习成本低
  4. 向后兼容:现有代码无需修改即可受益

与Go协程的对比:

  • Java虚拟线程:更好的生态集成,企业级特性
  • Go协程:语言原生支持,语法更简洁

适用场景:

  • 推荐使用:Web服务器、微服务、批量数据处理、IO密集型任务
  • 谨慎使用:CPU密集型任务、需要synchronized的场景

未来展望:

虚拟线程将成为Java高并发编程的标准选择,随着生态系统的完善,将会有更多框架和库原生支持虚拟线程。对于Java开发者而言,现在正是学习和应用虚拟线程的最佳时机。

通过合理使用虚拟线程,我们可以构建出更高效、更可扩展的Java应用程序,让Java在云原生和高并发领域重新焕发活力。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注