星驰编程网

免费编程资源分享平台_编程教程_代码示例_开发技术文章

线程池的那些事

为什么会有线程池这个东西?线程池的出现主要是为了解决哪些问题?

创建线程的高成本

每次创建一个新线程时,都会涉及以下操作:

  1. 分配内存空间。
  2. 初始化线程上下文。
  3. 与操作系统交互,为线程分配资源。 这些操作是比较昂贵的,尤其是在高并发场景中频繁创建和销毁线程,会导致资源浪费和性能下降。

线程过多导致资源耗尽

如果程序中直接创建大量线程,而每个线程都占用一定的资源(如内存、CPU),可能会导致系统资源耗尽,使得程序运行效率极低或直接崩溃。线程池通过限制线程的数量,可以有效地避免这种情况。

线程生命周期管理复杂

线程的生命周期包括创建、运行、等待、终止等。直接管理这些生命周期非常复杂,容易出现线程泄漏或死锁等问题。而线程池可以统一管理线程的生命周期,简化开发工作。

提高性能和响应速度

线程池可以预先创建一组线程,当任务到来时直接复用这些线程,从而避免线程的频繁创建和销毁,极大地提高了系统的响应速度。

适应多任务并发处理

在高并发场景下,程序可能需要处理大量的任务。线程池可以为这些任务提供一个统一的执行环境,分配线程去处理任务,并确保线程的复用。

线程池的工作原理

线程池的核心思想是维护一个池(Pool),里面有固定数量的线程。这些线程可以重复使用,执行不同的任务。线程池的工作流程通常如下:

  1. 提交任务到线程池。
  2. 线程池从任务队列中取出任务。
  3. 一个空闲线程执行任务。
  4. 执行完任务后,该线程不会销毁,而是继续等待新的任务。

线程池在生活中的映射

1.餐厅中的服务员

  • 现实场景: 在餐厅里,通常会有一定数量的服务员来服务顾客。当顾客到达时,服务员负责接待顾客、点餐、上菜等。如果顾客过多,超出了服务员的能力范围,则需要排队等待空闲的服务员。
  • 线程池映射
    • 服务员 = 线程
    • 顾客 = 任务
    • 服务员的数量 = 线程池的大小
    • 服务员的调度 = 线程池的任务分配
  • 餐厅通过服务员数量的限制来控制资源使用情况,避免让每个顾客都自行寻找服务员(相当于频繁创建和销毁线程)。

2.银行柜台

  • 现实场景: 银行有固定数量的柜台用于为顾客办理业务。如果所有柜台都在处理业务,排队的顾客需要等待空闲的柜台。每个柜台可以连续处理多个顾客的需求。
  • 线程池映射
  • 柜台 = 线程
  • 顾客需求 = 任务
  • 柜台数量 = 线程池的大小
  • 柜台的调度 = 线程池的任务分配
  • 银行通过固定柜台数量避免混乱,提高业务处理效率。

使用线程池给我们带来了什么

线程池在生活中的映射通常涉及有限资源(线程)和需要处理的任务(工作)。通过合理的资源调度和任务分配,可以提高效率,减少浪费,同时控制任务的并发量。无论是服务员、柜台,都可以通过类似线程池的模式实现资源的高效利用和任务的合理调度。

线程池是如何定义的

使用的包

<dependency>  
<groupId>org.apache.commons</groupId>  
<artifactId>commons-lang3</artifactId>  
<version>3.8.1</version>  
<scope>compile</scope>  
</dependency>  

<dependency>  
<groupId>com.google.guava</groupId>  
<artifactId>guava</artifactId>  
<version>28.2-jre</version>  
</dependency>  

<dependency>  
<groupId>com.alibaba.fastjson2</groupId>  
<artifactId>fastjson2</artifactId>  
<version>2.0.57</version>  
</dependency>

关键参数

  • coreSize / maxSize:线程池弹性扩缩容的边界。
  • keepAlive:非核心线程空闲超时后被回收。
  • executeQueue:任务队列,决定排队行为(有界/无界)。
  • factory:控制线程命名、守护状态等。
  • 拒绝策略:(如 AbortPolicyCallerRunsPolicy)。
  • allowCoreThreadTimeOut:默认核心线程永不过期,设为 true 后空闲超时也会被回收。
  • preStartAllCoreThreads:避免首次任务延迟,提前初始化核心线程。

使用队列

BlockingQueue<Runnable> executeQueue = new LinkedBlockingQueue<>(workQueueSize);
  1. BlockingQueue<Runnable> :
  2. 这是一个接口,表示一个线程安全的队列,专门用于存放Runnable对象(即可执行的任务)
  3. 支持阻塞操作:当队列满时,插入操作会被阻塞;当队列空时,获取操作会被阻塞
  4. LinkedBlockingQueue<>(workQueueSize) :
  5. LinkedBlockingQueueBlockingQueue的一个实现类,基于链表结构
  6. 构造函数参数workQueueSize指定了队列的容量(最大可存放的任务数)
  7. 如果创建时不指定容量(即使用无参构造),默认容量是Integer.MAX_VALUE

线程工厂

ThreadFactory factory = new BasicThreadFactory.Builder().namingPattern(executorPoolName).daemon(true).build();
  1. 核心组件
  • ThreadFactory
    一个接口,用于定制化线程创建(如线程名、优先级、守护状态等)。
  • BasicThreadFactory Apache Commons Lang 或类似工具库提供的实现类,支持链式配置。
  1. 配置项说明
  • .namingPattern(executorPoolName)
    • 设置线程名称的命名模式(如 "pool-1-thread-%d"),其中 %d 会被替换为自增数字。例如:若 executorPoolName task-thread-%d"线程名可能是 task-thread-task-thread-2`。
  • .daemon(true)
    • 将线程标记为守护线程(Daemon Thread) 。守护线程不会阻止 JVM 退出(设置线程均为守护线程,避免因线程未关闭导致 JVM 无法退出。)
  • .build()
    最终生成配置好的 ThreadFactory 实例。

JVM 关闭钩子

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    
    // 2. 平缓关闭(不再接受新任务,等待执行中任务完成)
    executor.shutdown(); 
    
    try {
        // 3.限时等待任务完成(3秒)
        if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
            log.error("强制终止线程池(等待超时)");
            executor.shutdownNow(); // 中断所有线程
        }
    } catch (InterruptedException e) {
        log.error("强制终止线程池(钩子被中断)");
        executor.shutdownNow();
    }
}));
  • 核心逻辑shutdown() :触发优雅关闭,等待已提交任务完成。awaitTermination(3, TimeUnit.SECONDS) :最多等待 3 秒。shutdownNow() :超时或中断时强制终止(返回未完成的任务列表)。
  • 用途:确保 JVM 退出时线程池资源被释放,避免任务丢失或线程泄漏。
  • 注意事项
  • 线程安全:钩子中的逻辑必须线程安全,避免竞态条件。
  • 避免死锁:不要在钩子中依赖可能被JVM提前关闭的资源(如数据库连接)。
  • 性能影响:钩子应快速执行,长时间阻塞可能导致JVM无法退出。
  • 注册时机:必须在JVM关闭前注册(通常在应用启动时完成)。
  • 无法保证执行kill -9(SIGKILL)会直接终止JVM,不触发钩子。JVM崩溃(如本地内存耗尽)时钩子不会运行。

实例化线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, keepAlive, TimeUnit.SECONDS, executeQueue, factory, RejectHandlerGetter.getProxy(rejectedType.getName()));

这里面使用动态代理的技术

public static RejectedExecutionHandler getProxy(RejectedExecutionHandler handler) {  
    return (RejectedExecutionHandler) Proxy  
    .newProxyInstance(handler.getClass().getClassLoader(),  
    new Class[]{RejectedExecutionHandler.class},  
    new 代理类(handler));  
}

代理类 这个类要继承InvocationHandler,进行了一层封装,目的是在不修改原有拒绝策略逻辑的前提下,增强或监控拒绝行为

  • 典型场景:记录任务被拒绝的日志(如输出线程池状态、任务信息)。监控拒绝次数(用于报警或性能分析)。在拒绝时执行额外逻辑(如降级处理、转发任务到备用线程池)。在任务被拒绝时(rejectedExecution方法),记录线程池状态和任务信息。调用原始handler的逻辑(如AbortPolicy抛出异常,或CallerRunsPolicy让提交者线程执行任务)。
  • 性能开销:每次方法调用需经过代理层,但对拒绝策略这种低频操作影响可忽略。
  • 调试/监控:快速定位线程池拒绝任务的瓶颈。
  • 功能:在不修改源码的情况下,为第三方拒绝策略添加自定义逻辑(如AOP思想)

线程池是如何监控的

简单的监控,可以通过BeanPostProcessor 来进行bean的定时监控

@Override  
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {  
    if (bean instanceof ExecutorFuturePool) {  
        // 注册监控逻辑(如定时采集线程池指标)  
        ExecutorFuturePool executorFuturePool = (ExecutorFuturePool) bean;  
        ThreadPoolExecutor executor = executorFuturePool.getExecutor();  
        ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();  
        monitor.scheduleAtFixedRate(() -> {  
        log.info("监控 beanName={} activeCount={} queueSize={}",  
        beanName, executor.getActiveCount(), executor.getQueue().size());  
        }, 0, 1, TimeUnit.SECONDS); // 每秒监控一次  
    }  
    return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);  
}
  • 关键机制
    • postProcessBeforeInitialization | Bean 初始化前调用(适合修改属性)。
    • postProcessAfterInitialization | Bean 初始化后调用(适合监控/代理,此处更可能被使用)。
  • 实际应用场景
    • 线程池监控:自动为所有 ThreadPoolExecutor Bean 添加指标采集(对接 Prometheus/Micrometer)。
    • 动态增强:为线程池 Bean 生成代理,实现拒绝策略的日志记录。
    • 依赖检查:确保线程池的配置符合规范(如核心线程数 ≤ 最大线程数)。
  • 注意事项
    • 性能影响:避免在 BeanPostProcessor 中执行耗时操作(如阻塞 I/O)。
    • 作用范围:会处理 所有 Spring 容器中的 Bean,需通过条件判断精准拦截目标(如 instanceof)。
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言