开发教程您现在的位置:主页 > 开发教程 >

对线程池的理解

发布日期:2018-01-16 09:32

线程池是一种多线程的处理方式,处理的过程中将任务添加到队列中,然后在创建线程后就会自动启动这些任务。西安Java培训整理了如下的相关知识:
1.ThreadPoolExecutor类
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,下面我们来看一下ThreadPoolExecutor类的具体实现源码
在ThreadPoolExecutor类中提供了四个构造方法:    
public class ThreadPoolExecutor extends AbstractExecutorService {
   .....
   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    ...
}
从上面的代码能够得知,ThreadPoolExecutor承继了AbstractExecutorService类,并供给了四个结构器,事实上,发现前面三个结构器都是调用的第四个结构器进行的初始化作业。
下面解说下一下结构器中各个参数的意义:
corePoolSize:线程池中心线程巨细。在创立了线程池后,默许情况下,线程池中并没有任何线程,而是等候有使命到来才创立线程去执行使命,除非调用了prestartAllCoreThreads()或者prestartCoreThread()办法,从这2个办法的名字就能够看出,是预创立线程的意思,即在没有使命到来之前就创立corePoolSize个线程或者一个线程。默许情况下,在创立了线程池后,线程池中的线程数为0,当有使命来之后,就会创立一个线程去执行使命,当线程池中的线程数目抵达corePoolSize后,就会把抵达的使命放到缓存行列当中;
maximumPoolSize:线程池最大线程数,表明在线程池中最多能创立多少个线程;
keepAliveTime:表明线程没有使命执行时最多坚持多久时刻会停止。默许情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,假如一个线程闲暇的时刻抵达keepAliveTime,则会停止,直到线程池中的线程数不超越corePoolSize。可是假如调用了allowCoreThreadTimeOut(boolean)办法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数的时刻单位,有7种取值,在TimeUnit类中有7种静态特点:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微秒
TimeUnit.NANOSECONDS;       //纳秒
workQueue:一个堵塞行列,用来存储等候执行的使命;
threadFactory:线程工厂,主要用来创立线程;
handler:表明当拒绝处理使命时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢掉使命并抛出RejectedExecutionException反常。
ThreadPoolExecutor.DiscardPolicy:也是丢掉使命,可是不抛出反常
ThreadPoolExecutor.DiscardOldestPolicy:丢掉行列最前面的使命,然后重新尝试执行使命(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该使命
从ThreadPoolExecutor类能够知道,ThreadPoolExecutor承继了AbstractExecutorService,咱们来看一下AbstractExecutorService的实现:
public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();
            // Start one task for sure; the rest incrementally            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;
            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            if (ee == null)
                ee = new ExecutionException();
            throw ee;
 
        } finally {
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
            long lastTime = System.nanoTime();
            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                if (nanos <= 0)
                    return futures;
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }
}
AbstractExecutorService是一个抽象类,它完成了ExecutorService接口,而ExecutorService又是承继了Executor接口,它们的根本关系如下:
Executor是一个顶层接口,在它里边只声明晰一个办法execute(Runnable),回来值为void,参数为Runnable类型,用来履行传进去的使命的;
ExecutorService接口承继了Executor接口,并声明晰一些办法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService完成了ExecutorService接口,根本完成了ExecutorService中声明的一切办法;
ThreadPoolExecutor承继了类AbstractExecutorService,
在ThreadPoolExecutor类中有几个非常重要的办法:
execute()
submit()
shutdown()
shutdownNow()
execute()办法实际上是Executor中声明的办法,在ThreadPoolExecutor进行了详细的完成,这个办法是ThreadPoolExecutor的中心办法,通过这个办法能够向线程池提交一个使命,交由线程池去履行。
submit()办法是在ExecutorService中声明的办法,在AbstractExecutorService就已经有了详细的完成,在ThreadPoolExecutor中并没有对其进行重写,这个办法也是用来向线程池提交使命的,可是它和execute()办法不同,它能够回来使命履行的成果,去看submit()办法的完成,会发现它实际上仍是调用的execute()办法,只不过它利用了Future来获取使命履行成果。
shutdown()和shutdownNow()是用来封闭线程池的。