Java 多线程
多线程是Java编程中的重要概念,它允许程序同时执行多个任务,提高程序的执行效率和响应性。Java提供了丰富的API来支持多线程编程。
线程创建方式
在Java中,有多种方式可以创建和启动线程,每种方式都有其特点和适用场景。
继承Thread类
通过继承Thread类并重写run()方法来创建线程是最直接的方式。
java
// 通过继承Thread类创建线程
class MyThread extends Thread {
private String threadName;
public MyThread(String name) {
this.threadName = name;
}
@Override
public void run() {
System.out.println(threadName + " 线程开始执行");
// 模拟一些工作
for (int i = 1; i <= 5; i++) {
System.out.println(threadName + " 执行任务 " + i);
try {
// 模拟耗时操作
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(threadName + " 被中断");
return;
}
}
System.out.println(threadName + " 线程执行完毕");
}
}
// 继承Thread类示例
public class ThreadExample {
public static void main(String[] args) {
System.out.println("=== 继承Thread类创建线程 ===");
// 创建线程对象
MyThread thread1 = new MyThread("线程1");
MyThread thread2 = new MyThread("线程2");
// 启动线程
thread1.start();
thread2.start();
// 主线程继续执行
System.out.println("主线程继续执行...");
// 等待子线程执行完毕
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
System.err.println("主线程被中断: " + e.getMessage());
}
System.out.println("所有线程执行完毕");
}
}实现Runnable接口
实现Runnable接口是更推荐的创建线程方式,因为它避免了Java单继承的限制。
java
// 通过实现Runnable接口创建线程
class MyRunnable implements Runnable {
private String taskName;
private int iterations;
public MyRunnable(String taskName, int iterations) {
this.taskName = taskName;
this.iterations = iterations;
}
@Override
public void run() {
System.out.println(taskName + " 任务开始执行");
for (int i = 1; i <= iterations; i++) {
System.out.println(taskName + " 执行第 " + i + " 次任务");
// 模拟工作负载
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println(taskName + " 被中断");
Thread.currentThread().interrupt(); // 恢复中断状态
return;
}
}
System.out.println(taskName + " 任务执行完毕");
}
}
// 实现Runnable接口示例
public class RunnableExample {
public static void main(String[] args) {
System.out.println("=== 实现Runnable接口创建线程 ===");
// 创建Runnable任务
MyRunnable task1 = new MyRunnable("任务1", 3);
MyRunnable task2 = new MyRunnable("任务2", 5);
// 创建线程对象并传入Runnable任务
Thread thread1 = new Thread(task1);
Thread thread2 = new Thread(task2);
// 设置线程优先级
thread1.setPriority(Thread.MIN_PRIORITY);
thread2.setPriority(Thread.MAX_PRIORITY);
// 启动线程
thread1.start();
thread2.start();
// 使用Lambda表达式创建线程(Java 8+)
Thread lambdaThread = new Thread(() -> {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " Lambda线程开始执行");
for (int i = 1; i <= 3; i++) {
System.out.println(threadName + " Lambda任务 " + i);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
System.out.println(threadName + " Lambda线程被中断");
return;
}
}
System.out.println(threadName + " Lambda线程执行完毕");
}, "LambdaThread");
lambdaThread.start();
// 主线程工作
System.out.println("主线程执行其他任务...");
// 等待所有线程完成
try {
thread1.join();
thread2.join();
lambdaThread.join();
} catch (InterruptedException e) {
System.err.println("主线程被中断: " + e.getMessage());
}
System.out.println("所有线程执行完毕");
}
}实现Callable接口
Callable接口与Runnable类似,但可以返回结果并抛出异常。
java
import java.util.concurrent.*;
// 通过实现Callable接口创建可返回结果的线程
class MyCallable implements Callable<String> {
private String taskName;
private int duration;
public MyCallable(String taskName, int duration) {
this.taskName = taskName;
this.duration = duration;
}
@Override
public String call() throws Exception {
System.out.println(taskName + " 开始执行");
// 模拟耗时任务
Thread.sleep(duration * 1000);
// 返回结果
String result = taskName + " 执行完成,耗时 " + duration + " 秒";
System.out.println(result);
return result;
}
}
// Callable接口示例
public class CallableExample {
public static void main(String[] args) {
System.out.println("=== 实现Callable接口创建线程 ===");
// 创建ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建Callable任务
MyCallable task1 = new MyCallable("任务1", 2);
MyCallable task2 = new MyCallable("任务2", 3);
MyCallable task3 = new MyCallable("任务3", 1);
// 提交任务并获取Future对象
Future<String> future1 = executor.submit(task1);
Future<String> future2 = executor.submit(task2);
Future<String> future3 = executor.submit(task3);
// 处理结果
try {
// 获取结果(阻塞等待)
String result1 = future1.get();
System.out.println("获取到结果: " + result1);
// 带超时的获取结果
String result2 = future2.get(5, TimeUnit.SECONDS);
System.out.println("获取到结果: " + result2);
// 检查任务是否完成
if (future3.isDone()) {
String result3 = future3.get();
System.out.println("获取到结果: " + result3);
}
} catch (InterruptedException e) {
System.err.println("主线程被中断: " + e.getMessage());
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.err.println("任务执行异常: " + e.getMessage());
} catch (TimeoutException e) {
System.err.println("任务执行超时: " + e.getMessage());
// 取消超时的任务
future2.cancel(true);
}
// 关闭ExecutorService
executor.shutdown();
// 等待所有任务完成
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("所有任务执行完毕");
}
}线程生命周期与状态转换
Java线程有六种状态,它们之间可以相互转换。
java
// 线程状态演示
public class ThreadStateExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== 线程生命周期与状态转换 ===");
// 1. 新建状态 (NEW)
Thread newThread = new Thread(() -> {
System.out.println("线程执行中...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("线程被中断");
}
});
System.out.println("1. 新建状态: " + newThread.getState()); // NEW
// 2. 就绪状态 (RUNNABLE)
newThread.start();
Thread.sleep(100); // 给线程一些时间启动
System.out.println("2. 就绪/运行状态: " + newThread.getState()); // RUNNABLE
// 3. 阻塞状态演示
Thread blockingThread = new Thread(() -> {
synchronized (ThreadStateExample.class) {
System.out.println("进入同步块");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("同步块中的线程被中断");
}
}
});
// 先启动一个线程占用锁
synchronized (ThreadStateExample.class) {
blockingThread.start();
Thread.sleep(100);
System.out.println("3. 阻塞状态: " + blockingThread.getState()); // BLOCKED
Thread.sleep(3000); // 让第一个线程释放锁
}
Thread.sleep(100);
System.out.println("4. 锁释放后: " + blockingThread.getState()); // TERMINATED or RUNNABLE
// 4. 等待状态演示
Object lock = new Object();
Thread waitingThread = new Thread(() -> {
synchronized (lock) {
try {
System.out.println("线程进入等待状态");
lock.wait(); // 等待通知
System.out.println("线程被唤醒");
} catch (InterruptedException e) {
System.out.println("等待中的线程被中断");
}
}
});
waitingThread.start();
Thread.sleep(100);
System.out.println("5. 等待状态: " + waitingThread.getState()); // WAITING
// 通知等待的线程
synchronized (lock) {
lock.notify();
}
waitingThread.join();
System.out.println("6. 等待后状态: " + waitingThread.getState()); // TERMINATED
// 5. 超时等待状态演示
Thread timedWaitingThread = new Thread(() -> {
try {
System.out.println("线程进入超时等待状态");
Thread.sleep(2000);
System.out.println("超时等待结束");
} catch (InterruptedException e) {
System.out.println("超时等待中的线程被中断");
}
});
timedWaitingThread.start();
Thread.sleep(100);
System.out.println("7. 超时等待状态: " + timedWaitingThread.getState()); // TIMED_WAITING
timedWaitingThread.join();
System.out.println("8. 终止状态: " + timedWaitingThread.getState()); // TERMINATED
// 线程状态转换图示
printThreadStateDiagram();
}
private static void printThreadStateDiagram() {
System.out.println("\n=== 线程状态转换图 ===");
System.out.println("NEW (新建) --start()--> RUNNABLE (就绪/运行)");
System.out.println("RUNNABLE --synchronized--> BLOCKED (阻塞)");
System.out.println("RUNNABLE --wait()/join()--> WAITING (无限期等待)");
System.out.println("RUNNABLE --sleep()/wait(timeout)--> TIMED_WAITING (限期等待)");
System.out.println("RUNNABLE --任务完成--> TERMINATED (终止)");
System.out.println("BLOCKED --获得锁--> RUNNABLE");
System.out.println("WAITING --notify()/notifyAll()--> RUNNABLE");
System.out.println("TIMED_WAITING --超时/notify()--> RUNNABLE");
}
}
// 线程优先级和守护线程示例
class ThreadPriorityExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== 线程优先级和守护线程 ===");
// 线程优先级演示
System.out.println("主线程优先级: " + Thread.currentThread().getPriority());
System.out.println("最大优先级: " + Thread.MAX_PRIORITY);
System.out.println("最小优先级: " + Thread.MIN_PRIORITY);
System.out.println("默认优先级: " + Thread.NORM_PRIORITY);
// 创建不同优先级的线程
Thread lowPriorityThread = new Thread(new CounterTask("低优先级线程"), "LowPriority");
Thread normPriorityThread = new Thread(new CounterTask("普通优先级线程"), "NormPriority");
Thread highPriorityThread = new Thread(new CounterTask("高优先级线程"), "HighPriority");
// 设置优先级
lowPriorityThread.setPriority(Thread.MIN_PRIORITY);
normPriorityThread.setPriority(Thread.NORM_PRIORITY);
highPriorityThread.setPriority(Thread.MAX_PRIORITY);
// 启动线程
lowPriorityThread.start();
normPriorityThread.start();
highPriorityThread.start();
// 让线程运行一段时间
Thread.sleep(3000);
// 中断线程
lowPriorityThread.interrupt();
normPriorityThread.interrupt();
highPriorityThread.interrupt();
// 等待线程结束
lowPriorityThread.join();
normPriorityThread.join();
highPriorityThread.join();
System.out.println("\n--- 守护线程演示 ---");
// 创建守护线程
Thread daemonThread = new Thread(() -> {
while (true) {
System.out.println("守护线程正在运行...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("守护线程被中断");
break;
}
}
}, "DaemonThread");
// 设置为守护线程
daemonThread.setDaemon(true);
daemonThread.start();
// 主线程执行其他任务
System.out.println("主线程执行任务...");
Thread.sleep(5000);
System.out.println("主线程任务完成,程序即将退出");
// 当主线程(用户线程)结束时,守护线程会自动结束
}
}
// 计数任务类
class CounterTask implements Runnable {
private String threadName;
private volatile long count = 0;
public CounterTask(String threadName) {
this.threadName = threadName;
}
@Override
public void run() {
System.out.println(threadName + " 开始执行,优先级: " + Thread.currentThread().getPriority());
while (!Thread.currentThread().isInterrupted()) {
count++;
// 模拟一些工作
if (count % 1000000 == 0) {
System.out.println(threadName + " 计数: " + count);
}
}
System.out.println(threadName + " 结束,最终计数: " + count);
}
}同步机制
在多线程环境中,同步机制用于控制多个线程对共享资源的访问,防止数据不一致。
synchronized 关键字
synchronized关键字是最常用的同步机制,它可以修饰方法或代码块。
java
// 线程安全的计数器类
class SynchronizedCounter {
private int count = 0;
// 同步方法 - 锁定的是this对象
public synchronized void increment() {
count++;
System.out.println(Thread.currentThread().getName() + " 增加计数: " + count);
}
// 同步方法
public synchronized void decrement() {
count--;
System.out.println(Thread.currentThread().getName() + " 减少计数: " + count);
}
// 同步方法
public synchronized int getCount() {
return count;
}
// 静态同步方法 - 锁定的是类对象
public static synchronized void staticMethod() {
System.out.println("静态同步方法被调用");
}
}
// 同步代码块示例
class SynchronizedBlockExample {
private int value = 0;
private final Object lock = new Object(); // 专用锁对象
private static final Object staticLock = new Object(); // 静态锁对象
// 同步代码块 - 锁定特定对象
public void increment() {
synchronized (lock) {
value++;
System.out.println(Thread.currentThread().getName() + " 增加值: " + value);
try {
Thread.sleep(100); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 同步代码块 - 锁定this对象
public void decrement() {
synchronized (this) {
value--;
System.out.println(Thread.currentThread().getName() + " 减少值: " + value);
}
}
// 同步代码块 - 锁定类对象
public void staticOperation() {
synchronized (staticLock) {
System.out.println(Thread.currentThread().getName() + " 执行静态操作");
}
}
public int getValue() {
synchronized (lock) {
return value;
}
}
}
// synchronized关键字示例
public class SynchronizedExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== synchronized关键字示例 ===");
// 1. 同步方法演示
synchronizedMethodDemo();
// 2. 同步代码块演示
synchronizedBlockDemo();
// 3. 死锁演示
deadlockDemo();
}
// 同步方法演示
public static void synchronizedMethodDemo() throws InterruptedException {
System.out.println("\n--- 同步方法演示 ---");
SynchronizedCounter counter = new SynchronizedCounter();
// 创建多个线程同时访问共享资源
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadNum = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 3; j++) {
counter.increment();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
counter.decrement();
}
}, "Thread-" + threadNum);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终计数: " + counter.getCount());
}
// 同步代码块演示
public static void synchronizedBlockDemo() throws InterruptedException {
System.out.println("\n--- 同步代码块演示 ---");
SynchronizedBlockExample example = new SynchronizedBlockExample();
// 创建多个线程
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
final int threadNum = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 5; j++) {
example.increment();
example.decrement();
example.staticOperation();
}
}, "Worker-" + threadNum);
}
// 启动线程
for (Thread thread : threads) {
thread.start();
}
// 等待完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终值: " + example.getValue());
}
// 死锁演示
public static void deadlockDemo() throws InterruptedException {
System.out.println("\n--- 死锁演示 ---");
// 创建两个锁对象
final Object lock1 = new Object();
final Object lock2 = new Object();
// 线程1:先获取lock1,再获取lock2
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("线程1 获取了 lock1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock2) {
System.out.println("线程1 获取了 lock2");
}
}
}, "Thread1");
// 线程2:先获取lock2,再获取lock1
Thread thread2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("线程2 获取了 lock2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock1) {
System.out.println("线程2 获取了 lock1");
}
}
}, "Thread2");
// 启动线程
thread1.start();
thread2.start();
// 等待一段时间观察死锁
Thread.sleep(2000);
System.out.println("线程1状态: " + thread1.getState());
System.out.println("线程2状态: " + thread2.getState());
// 中断线程以解除死锁
thread1.interrupt();
thread2.interrupt();
thread1.join();
thread2.join();
System.out.println("死锁演示完成");
}
}
// 线程安全的单例模式
class ThreadSafeSingleton {
private static volatile ThreadSafeSingleton instance;
private ThreadSafeSingleton() {
// 私有构造方法
}
// 双重检查锁定实现
public static ThreadSafeSingleton getInstance() {
if (instance == null) {
synchronized (ThreadSafeSingleton.class) {
if (instance == null) {
instance = new ThreadSafeSingleton();
}
}
}
return instance;
}
public void doSomething() {
System.out.println("单例对象执行操作: " + Thread.currentThread().getName());
}
}Lock 接口
Lock接口提供了比synchronized更灵活的锁定操作。
java
import java.util.concurrent.locks.*;
// 使用Lock接口的计数器
class LockCounter {
private int count = 0;
private final Lock lock = new ReentrantLock();
public void increment() {
lock.lock(); // 获取锁
try {
count++;
System.out.println(Thread.currentThread().getName() + " 增加计数: " + count);
} finally {
lock.unlock(); // 释放锁(必须在finally块中释放)
}
}
public void decrement() {
lock.lock();
try {
count--;
System.out.println(Thread.currentThread().getName() + " 减少计数: " + count);
} finally {
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
// 读写锁示例
class ReadWriteLockExample {
private String data = "初始数据";
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
// 读操作
public String readData() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 读取数据: " + data);
Thread.sleep(1000); // 模拟读取耗时
return data;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
readLock.unlock();
}
}
// 写操作
public void writeData(String newData) {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 写入数据: " + newData);
Thread.sleep(2000); // 模拟写入耗时
data = newData;
System.out.println(Thread.currentThread().getName() + " 写入完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock();
}
}
}
// 条件变量示例
class ConditionExample {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean flag = false;
public void waitForFlag() {
lock.lock();
try {
while (!flag) {
System.out.println(Thread.currentThread().getName() + " 等待标志变为true");
condition.await(); // 等待条件满足
}
System.out.println(Thread.currentThread().getName() + " 标志已变为true,继续执行");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public void setFlag() {
lock.lock();
try {
flag = true;
System.out.println(Thread.currentThread().getName() + " 设置标志为true");
condition.signalAll(); // 唤醒所有等待的线程
} finally {
lock.unlock();
}
}
}
// Lock接口示例
public class LockExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Lock接口示例 ===");
// 1. 基本Lock使用
basicLockUsage();
// 2. 读写锁使用
readWriteLockUsage();
// 3. 条件变量使用
conditionVariableUsage();
// 4. 公平锁和非公平锁
fairLockDemo();
}
// 基本Lock使用
public static void basicLockUsage() throws InterruptedException {
System.out.println("\n--- 基本Lock使用 ---");
LockCounter counter = new LockCounter();
// 创建多个线程
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadNum = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 3; j++) {
counter.increment();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
counter.decrement();
}
}, "LockThread-" + threadNum);
}
// 启动线程
for (Thread thread : threads) {
thread.start();
}
// 等待完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终计数: " + counter.getCount());
}
// 读写锁使用
public static void readWriteLockUsage() throws InterruptedException {
System.out.println("\n--- 读写锁使用 ---");
ReadWriteLockExample example = new ReadWriteLockExample();
// 创建读线程
Thread[] readThreads = new Thread[3];
for (int i = 0; i < readThreads.length; i++) {
final int threadNum = i;
readThreads[i] = new Thread(() -> {
for (int j = 0; j < 2; j++) {
example.readData();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Reader-" + threadNum);
}
// 创建写线程
Thread writeThread = new Thread(() -> {
try {
Thread.sleep(1000); // 等待读线程开始
example.writeData("更新后的数据");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Writer");
// 启动所有线程
for (Thread thread : readThreads) {
thread.start();
}
writeThread.start();
// 等待完成
for (Thread thread : readThreads) {
thread.join();
}
writeThread.join();
}
// 条件变量使用
public static void conditionVariableUsage() throws InterruptedException {
System.out.println("\n--- 条件变量使用 ---");
ConditionExample example = new ConditionExample();
// 创建等待线程
Thread[] waitingThreads = new Thread[3];
for (int i = 0; i < waitingThreads.length; i++) {
final int threadNum = i;
waitingThreads[i] = new Thread(() -> {
example.waitForFlag();
}, "Waiter-" + threadNum);
}
// 创建设置标志的线程
Thread setterThread = new Thread(() -> {
try {
Thread.sleep(2000); // 等待一会儿再设置标志
example.setFlag();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Setter");
// 启动所有线程
for (Thread thread : waitingThreads) {
thread.start();
}
setterThread.start();
// 等待完成
for (Thread thread : waitingThreads) {
thread.join();
}
setterThread.join();
System.out.println("条件变量示例完成");
}
// 公平锁和非公平锁演示
public static void fairLockDemo() throws InterruptedException {
System.out.println("\n--- 公平锁和非公平锁演示 ---");
// 非公平锁(默认)
Lock unfairLock = new ReentrantLock();
System.out.println("非公平锁测试:");
testLockFairness(unfairLock, false);
// 公平锁
Lock fairLock = new ReentrantLock(true);
System.out.println("\n公平锁测试:");
testLockFairness(fairLock, true);
}
private static void testLockFairness(Lock lock, boolean isFair) throws InterruptedException {
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadNum = i;
threads[i] = new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 获取到锁");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}, "FairTest-" + threadNum);
}
// 启动线程
for (Thread thread : threads) {
thread.start();
Thread.sleep(10); // 短暂间隔,让线程按顺序请求锁
}
// 等待完成
for (Thread thread : threads) {
thread.join();
}
}
}线程池
线程池是一种管理线程的机制,它可以有效地管理和复用线程,避免频繁创建和销毁线程带来的开销。
java
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
// 线程池基础示例
public class ThreadPoolExample {
public static void main(String[] args) {
System.out.println("=== 线程池示例 ===");
// 1. 固定大小线程池
fixedThreadPoolExample();
// 2. 缓存线程池
cachedThreadPoolExample();
// 3. 单线程池
singleThreadPoolExample();
// 4. 定时线程池
scheduledThreadPoolExample();
// 5. 自定义线程池
customThreadPoolExample();
}
// 固定大小线程池
public static void fixedThreadPoolExample() {
System.out.println("\n--- 固定大小线程池 ---");
// 创建固定大小为3的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 提交任务
for (int i = 1; i <= 10; i++) {
final int taskId = i;
fixedThreadPool.submit(() -> {
System.out.println("任务 " + taskId + " 由 " +
Thread.currentThread().getName() + " 执行");
try {
Thread.sleep(2000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务 " + taskId + " 执行完成");
});
}
// 关闭线程池
fixedThreadPool.shutdown();
try {
// 等待所有任务完成
if (!fixedThreadPool.awaitTermination(30, TimeUnit.SECONDS)) {
System.out.println("强制关闭未完成的任务");
fixedThreadPool.shutdownNow();
}
} catch (InterruptedException e) {
fixedThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("固定大小线程池示例完成");
}
// 缓存线程池
public static void cachedThreadPoolExample() {
System.out.println("\n--- 缓存线程池 ---");
// 创建缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 提交大量短时间任务
for (int i = 1; i <= 20; i++) {
final int taskId = i;
cachedThreadPool.submit(() -> {
System.out.println("缓存任务 " + taskId + " 由 " +
Thread.currentThread().getName() + " 执行");
try {
Thread.sleep(1000); // 短时间任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("缓存任务 " + taskId + " 执行完成");
});
}
// 关闭线程池
cachedThreadPool.shutdown();
try {
if (!cachedThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
cachedThreadPool.shutdownNow();
}
} catch (InterruptedException e) {
cachedThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("缓存线程池示例完成");
}
// 单线程池
public static void singleThreadPoolExample() {
System.out.println("\n--- 单线程池 ---");
// 创建单线程池
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
// 提交任务(会按顺序执行)
List<Future<String>> futures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
Future<String> future = singleThreadPool.submit(() -> {
System.out.println("单线程任务 " + taskId + " 开始执行");
Thread.sleep(1000);
String result = "任务 " + taskId + " 执行结果";
System.out.println("单线程任务 " + taskId + " 执行完成");
return result;
});
futures.add(future);
}
// 获取结果
for (Future<String> future : futures) {
try {
String result = future.get();
System.out.println("获取到结果: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("获取结果时发生错误: " + e.getMessage());
}
}
// 关闭线程池
singleThreadPool.shutdown();
try {
if (!singleThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
singleThreadPool.shutdownNow();
}
} catch (InterruptedException e) {
singleThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("单线程池示例完成");
}
// 定时线程池
public static void scheduledThreadPoolExample() {
System.out.println("\n--- 定时线程池 ---");
// 创建定时线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
// 延迟执行任务
scheduledThreadPool.schedule(() -> {
System.out.println("延迟3秒执行的任务,由 " + Thread.currentThread().getName() + " 执行");
}, 3, TimeUnit.SECONDS);
// 周期性执行任务
ScheduledFuture<?> scheduledFuture = scheduledThreadPool.scheduleAtFixedRate(() -> {
System.out.println("周期性任务执行,时间: " + System.currentTimeMillis());
}, 1, 2, TimeUnit.SECONDS); // 1秒后开始,每2秒执行一次
// 运行5秒后取消周期性任务
scheduledThreadPool.schedule(() -> {
System.out.println("取消周期性任务");
scheduledFuture.cancel(false);
}, 5, TimeUnit.SECONDS);
// 运行10秒后关闭线程池
scheduledThreadPool.schedule(() -> {
System.out.println("关闭定时线程池");
scheduledThreadPool.shutdown();
}, 10, TimeUnit.SECONDS);
System.out.println("定时线程池示例启动,等待任务执行...");
}
// 自定义线程池
public static void customThreadPoolExample() {
System.out.println("\n--- 自定义线程池 ---");
// 自定义线程池配置
ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(10), // 工作队列
new ThreadFactory() { // 线程工厂
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "CustomThread-" + counter++);
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
for (int i = 1; i <= 20; i++) {
final int taskId = i;
customThreadPool.submit(() -> {
System.out.println("自定义任务 " + taskId + " 由 " +
Thread.currentThread().getName() + " 执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("自定义任务 " + taskId + " 执行完成");
});
}
// 监控线程池状态
monitorThreadPool(customThreadPool);
// 关闭线程池
customThreadPool.shutdown();
try {
if (!customThreadPool.awaitTermination(30, TimeUnit.SECONDS)) {
System.out.println("强制关闭线程池");
customThreadPool.shutdownNow();
}
} catch (InterruptedException e) {
customThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("自定义线程池示例完成");
}
// 监控线程池状态
private static void monitorThreadPool(ThreadPoolExecutor executor) {
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
System.out.printf("线程池状态 - 活跃线程数: %d, 任务总数: %d, 已完成任务数: %d%n",
executor.getActiveCount(),
executor.getTaskCount(),
executor.getCompletedTaskCount());
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Monitor").start();
}
}
// 线程池高级特性示例
class AdvancedThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== 线程池高级特性示例 ===");
// 1. Future和Callable使用
futureAndCallableExample();
// 2. CompletableFuture异步编程
completableFutureExample();
// 3. 线程池拒绝策略
rejectionPolicyExample();
// 4. 线程池钩子方法
threadPoolHookExample();
}
// Future和Callable使用
public static void futureAndCallableExample() throws InterruptedException {
System.out.println("\n--- Future和Callable使用 ---");
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建Callable任务
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
Callable<Integer> task = () -> {
System.out.println("计算任务 " + taskId + " 开始");
Thread.sleep(2000); // 模拟计算
int result = taskId * taskId;
System.out.println("计算任务 " + taskId + " 完成,结果: " + result);
return result;
};
Future<Integer> future = executor.submit(task);
futures.add(future);
}
// 获取结果
System.out.println("获取计算结果:");
for (int i = 0; i < futures.size(); i++) {
try {
Integer result = futures.get(i).get(3, TimeUnit.SECONDS);
System.out.println("任务 " + (i + 1) + " 结果: " + result);
} catch (ExecutionException e) {
System.err.println("任务 " + (i + 1) + " 执行异常: " + e.getMessage());
} catch (TimeoutException e) {
System.err.println("任务 " + (i + 1) + " 执行超时");
futures.get(i).cancel(true);
}
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("Future和Callable示例完成");
}
// CompletableFuture异步编程
public static void completableFutureExample() {
System.out.println("\n--- CompletableFuture异步编程 ---");
// 异步执行任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务1执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务1结果";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务2执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务2结果";
});
// 组合两个异步任务
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
System.out.println("组合任务执行,线程: " + Thread.currentThread().getName());
return result1 + " + " + result2 + " = 组合结果";
});
// 处理结果
combinedFuture.thenAccept(result -> {
System.out.println("最终结果: " + result);
}).join(); // 等待完成
System.out.println("CompletableFuture示例完成");
}
// 线程池拒绝策略
public static void rejectionPolicyExample() {
System.out.println("\n--- 线程池拒绝策略 ---");
// 创建小容量线程池来演示拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), // 很小的队列
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RejectionTestThread");
}
},
new RejectedExecutionHandler() { // 自定义拒绝策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任务被拒绝: " + r.toString());
// 可以选择其他处理方式,如记录日志、放入数据库等
}
}
);
// 提交大量任务
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("执行任务 " + taskId);
try {
Thread.sleep(3000); // 长时间任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务 " + taskId + " 完成");
});
}
// 关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("拒绝策略示例完成");
}
// 线程池钩子方法
public static void threadPoolHookExample() {
System.out.println("\n--- 线程池钩子方法 ---");
// 自定义ThreadPoolExecutor,重写钩子方法
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("线程 " + t.getName() + " 开始执行任务: " + r.toString());
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null) {
System.err.println("任务执行异常: " + t.getMessage());
} else {
System.out.println("任务执行完成: " + r.toString());
}
super.afterExecute(r, t);
}
@Override
protected void terminated() {
System.out.println("线程池已终止");
super.terminated();
}
};
// 提交任务
for (int i = 1; i <= 3; i++) {
final int taskId = i;
customExecutor.submit(() -> {
System.out.println("自定义线程池任务 " + taskId + " 执行");
if (taskId == 2) {
throw new RuntimeException("模拟任务异常");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("自定义线程池任务 " + taskId + " 完成");
});
}
// 关闭线程池
customExecutor.shutdown();
try {
if (!customExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
customExecutor.shutdownNow();
}
} catch (InterruptedException e) {
customExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("线程池钩子方法示例完成");
}
}通过本章节的学习,您已经掌握了Java多线程编程的核心概念和实践技巧,包括线程创建方式、线程生命周期管理、同步机制以及线程池的使用。这些知识对于开发高性能、高并发的Java应用程序至关重要。