`
xiaoZ5919
  • 浏览: 400272 次
  • 性别: Icon_minigender_1
  • 来自: 安平人@北京
博客专栏
Group-logo
Netty学习笔记
浏览量:72748
社区版块
存档分类
最新评论

Java并发编程JUC源码学习之ThreadPoolExecutor

 
阅读更多

 ThreadPool的优点,比如资源的控制以及不用频繁的创建线程等就不用多说了。主要来讨论一下ThreadPoolExecutor的几个关键参数以及对task的添加以及线程的管理。它有这么个重要的参数corePoolSize、maximumPoolSize、keepAliveTime和taskqueue。

corePoolSize   线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。
maximumPoolSize   线程池中可维持线程的最大值。
keepAliveTime   当线程池中线程数量大于 corePoolSize  时,如果某些线程的空闲时间超过该值就会终止,直到线程数小于等于corePoolSize。
    1. 添加一个task的过程
    当要添加一个新的task,如果当前线程数小于 corePoolSize,直接添加一个线程,即使当前有空闲的线程。否则添加队列中。如果队列满了呢?则会判断当前线程数是否小于maximumPoolSize,如是则添加一个新的线程用来执行该task。如果超出最大线程数,那就只能reject了。
       
 int c = ctl.get();
        // 当前线程数小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //添加到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       //队列满时添加新的线程,如果线程数超过maximumPoolSize则reject
        else if (!addWorker(command, false))
            reject(command);
 
     使用addWorker(),来创建新的线程并传入的task作为第一个task执行。由此来看只有队列满时才会创建大于corePoolSize的线程。
 
    2.   KeepAliveTime是如何实现的呢?
       keepAliveTime的职责时,当线程池的队列满了,创建了多于corePoolSize的线程,这时处于节省资源的目的,会杀死多于corePoolSize空闲时间大于keepAliveTime的线程。它是如何做到呢?我们知道Woker时从taskQueue不断地轮询获取task并执行。如果获取不到task取到null时则退出循环结束线程。大概源码是这样的。
     
 try {
           //当取到task为null,跳出循环结束线程
            while (task != null || (task = getTask()) != null) {
                w.lock();
                clearInterruptsForTaskRun();
                 //此处省略,主要是执行task.run
             }
 
      关键是看看getTask()如何返回null,该方法的comments列出几种返回null并减少当前线程数的情景。我们在这里只关心keeAliveTime的实现。
      
boolean timedOut = false; // Did the last poll() time out? 该变量标识从taskqueue中取任务是否超时,如果超时则返回null
  
       retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);


            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }


            boolean timed;      // Are workers subject to culling?


            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;//线程数大于corePoolSize满足了杀死空闲线程的条件
                //第一次执行到这里,由于timeOut为false,跳出内循环执行从queue取任务
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                //第二次执行的时候由于timedOut为true,不会跳出内循环。执行下面的代码
               //将当前线程数减1并返回null致使该线程终止
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }


            try {
               //timed为true,这段code是实现空闲时间超过keepalivetime就被终止的精华所在。
              // 在给定的keepalivetime时间内从阻塞队列中取任务。如timeout则返回null
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;//如从queue中取任务超时则为true
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
 
1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics