热点新闻
并发专题-自己动手实现一个线程池
2023-07-09 18:18  浏览:4902  搜索引擎搜索“错改B2B”
温馨提示:为防找不到此信息,请务必收藏信息以备急用! 联系我时,请说明是在错改B2B看到的信息,谢谢。
展会发布 展会网站大全 报名观展合作 软文发布

前言

本文主要参照线程池ThreadPoolExecutor的实现方式自己写一个线程池,主要是因为ThreadPoolExecutor的源码读起来还是挺费劲,想通过自己仿写的方式加深理解

首先要了解ThreadPoolExecutor线程池的工作机制,不明白的看这里

初步思路

需要解决的问题
  • 线程池的主要作用是保存限制数量的线程,当有执行任务时,从中选择某个线程去执行,而不是来个任务就new一个Thread,鉴于thread.start()执行完成之后就会销毁,所以如何保持线程不销毁是个关键,解决思路有很多,比如可以给线程这样一个长期运行任务:当用用户任务到达时执行用户任务,没有用户任务时wait,用户任务出现时再notify唤醒执行,这样线程就不会被销毁掉
  • 线程池在所有线程都工作的情况下,依然可以存储任务,等线程池某个线程空闲时,任务就会被执行
解决方案

结合以上俩点,最终解决方案如下:
使用阻塞队列BlockingQueue来保存用户任务,好处在于获取时如果取不到就会一直阻塞等待,而每个线程执行的实际任务尝试获取任务队列中的用户任务,取不到就等待,取到了就执行,执行完之后再回来等待,形成一个死循环,线程执行的就是一个永久性任务,不会被销毁(等待的过程阻塞也不会占用CPU)

初步实现

首先线程池的本质是Executor,所以我们也按照规范继承并实现execute方法

public class MyThreadPool implements Executor

核心属性

定义一个基本参数corePoolSize来确定核心线程数大小,一个BlockingQueue来保存任务队列

public class MyThreadPool implements Executor { // 核心线程数大小 private int corePoolSize = 10; // 阻塞任务队列 private BlockingQueue<Runnable> workQueue = new linkedBlockingQueue<>(10); @Override public void execute(Runnable command) { } }

worker

接下来就要开始定义线程池中的工作者,每个工作者有自己的任务-完成第一个分配的工作并去任务队列取新任务做,由于是线程池,所以每个工作者都必须有一个工位即Thread,接下来实现这个工作者

// 拥有工作任务的工作者 private final class Worker implements Runnable { // 线程 Thread thread; // 第一个任务 Runnable firstTask; // 工作者初始化 Worker(Runnable task) { // 保存第一个任务 firstTask = task; // 初始化工作台 thread = new Thread(this); } // 工作者的任务计划 @Override public void run() { Runnable task = firstTask; firstTask = null; // 第一个任务未执行执行第一个,执行完后去队列获取任务,执行结束后再次尝试获取 while (task != null || (task = getTask()) != null) { try { task.run(); } finally { // 当前任务置空重新处理 task = null; } } } // 从队列获取任务 private Runnable getTask() { try { return workQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); return null; } } }

执行任务

接下来就是执行外界任务的核心逻辑,首先用一个集合存放所有工作者

private final HashSet<Worker> workers = new HashSet<Worker>();

执行任务的逻辑,小于corePoolSize即创建一个新工作者执行任务,否则放入任务对列,等待被空闲的工作者执行,队列满了直接拒绝任务

@Override public void execute(Runnable command) { // 如果工作线程少于corePoolSize,新增线程执行 if (workers.size() < corePoolSize) { addWorker(command); return; } // 否则添加到任务队列 if (workQueue.offer(command)) { return; } // 任务队列满了:拒绝 throw new RejectedExecutionException(); }

其中addWorker就是new一个Worker,并启动工作

private void addWorker(Runnable command) { Worker w = new Worker(command); final Thread t = w.thread; t.start(); workers.add(w); }

线程工厂

上面的线程生成有个问题,即线程的命名是默认的没有规律的,如果有多个线程池,调试时完全分不清哪些线程是干什么活产生的,因此我们把new Thread的创建方式修改为工厂模式创建方式,并把让其可配,工厂接口Dong Li都定义好了,直接用

public class MyThreadPool implements Executor { private int corePoolSize; private BlockingQueue<Runnable> workQueue; private ThreadFactory threadFactory; public MyThreadPool3(int corePoolSize, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this.corePoolSize = corePoolSize; this.workQueue = workQueue; this.threadFactory = threadFactory; }

这样再worker初始化是,创建线程的方式变为

// 工作者初始化 Worker(Runnable task) { // 保存第一个任务 firstTask = task; // 初始化工作台,使用工厂创建 thread = threadFactory.newThread(this); }

测试

这样,一个简单非线程安全的线程池就做完了,完整如下

public class MyThreadPool3 implements Executor { private int corePoolSize; private BlockingQueue<Runnable> workQueue; private ThreadFactory threadFactory; public MyThreadPool3(int corePoolSize, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this.corePoolSize = corePoolSize; this.workQueue = workQueue; this.threadFactory = threadFactory; } // 保存所有的worker private final HashSet<Worker> workers = new HashSet<Worker>(); @Override public void execute(Runnable command) { // 如果工作线程少于corePoolSize,新增线程执行 if (workers.size() < corePoolSize) { addWorker(command); return; } // 否则添加到任务队列 if (workQueue.offer(command)) { return; } // 任务队列满了:拒绝 throw new RejectedExecutionException(); } private void addWorker(Runnable command) { Worker w = new Worker(command); final Thread t = w.thread; t.start(); workers.add(w); } // 拥有工作任务的工作者 private final class Worker implements Runnable { // 线程 Thread thread; // 第一个任务 Runnable firstTask; // 工作者初始化 Worker(Runnable task) { // 保存第一个任务 firstTask = task; // 初始化工作台 thread = threadFactory.newThread(this); } // 工作者的任务计划 @Override public void run() { Runnable task = firstTask; firstTask = null; // 第一个任务未执行执行第一个,执行完后去队列获取任务,执行结束后再次尝试获取 while (task != null || (task = getTask()) != null) { try { task.run(); } finally { // 当前任务置空重新处理 task = null; } } } // 从队列获取任务 private Runnable getTask() { try { return workQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); return null; } } } }

测试一下是否好用,创建一个核心线程数是10,任务队列容量10的线程池,执行15个3秒的任务

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactory() { private AtomicInteger no = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "my-thread-"+(no.incrementAndGet())); } }; Executor poolExecutor = new MyThreadPool3(10, new linkedBlockingQueue<>(10), threadFactory); for(int i=1;i<=15;i++) { int finalI = i; Runnable task = () -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"-----任务["+finalI+"]done"); }; poolExecutor.execute(task); } }

3秒左右第一波任务干完,输出如下

my-thread-1-----任务[1]done my-thread-10-----任务[10]done my-thread-9-----任务[9]done my-thread-2-----任务[2]done my-thread-3-----任务[3]done my-thread-5-----任务[5]done my-thread-4-----任务[4]done my-thread-8-----任务[8]done my-thread-7-----任务[7]done my-thread-6-----任务[6]done

再过3秒左右,第二波任务干完,输出如下

my-thread-1-----任务[11]done my-thread-9-----任务[13]done my-thread-2-----任务[14]done my-thread-10-----任务[12]done my-thread-3-----任务[15]done

可以看到我们的简单线程池基本用起来没问题,达到了固定线程数分担所有任务的目的

增加maximumPoolSize

作用

ThreadPoolExecutor还有一个重要参数就是增加maximumPoolSize,代表最大可运行的线程数量,worker数超过了corePoolSize,但没超过maximumPoolSize且任务队列已满的情况下,会创建临时线程,这些临时线程完成初始任务后,会尝试去队列里获取新线程,超过了指定存活时间没有获取到任务,临时线程就没有啥用了,就会被销毁

其主要作用是在任务量暴增时,适当的增加线程量来完成任务,并且在空闲时间销毁这些救火的线程

实现思路

新增一个参数maximumPoolSize,在原有执行逻辑上修改:任务队列满且线程数小于maximumPoolSize时尝试新增临时线程,临时线程和核心线程主要差别在于读取队列设定时间读取不到时就销毁,可以BlockingQueue的poll(long timeout, TimeUnit unit)方法读取固定时间,读不到就直接跳循环,此时worker的任务执行完毕,线程自动就销毁了

实现

新增必要参数maximumPoolSizekeepAliveTime,通过构造函数实现可配

public class MyThreadPool4 implements Executor { // 核心线程数 private int corePoolSize; // 任务队列 private BlockingQueue<Runnable> workQueue; // 线程工厂 private ThreadFactory threadFactory; // 最大线程数 private volatile int maximumPoolSize; // 保持存活时间,纳秒 private volatile long keepAliveTime; public MyThreadPool4(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this.corePoolSize = corePoolSize; this.workQueue = workQueue; this.threadFactory = threadFactory; this.maximumPoolSize = maximumPoolSize; // 按单位将时间转换为纳秒 this.keepAliveTime = unit.tonanos(keepAliveTime); }

再区分一下核心worker和临时worker,核心worker一直等待任务队列,而临时worker等待固定时间

private final class Worker implements Runnable { Thread thread; Runnable firstTask; // 新增:是否核心线程 boolean core; Worker(Runnable task, boolean core) { firstTask = task; thread = threadFactory.newThread(this); // 新增:设置是否core this.core = core; }

core与否在获取任务队列时的区别

private Runnable getTask(boolean core) { try { // 非core使用BlockingQueue的poll(long timeout, TimeUnit unit)方法等待固定时间 Runnable runnable = core?workQueue.take():workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); return null; } catch (InterruptedException e) { e.printStackTrace(); return null; } }

此时新增worker时传入是否core的标识

@Override public void execute(Runnable command) { // 如果工作线程少于corePoolSize,新增线程执行,此时core=true if (workers.size() < corePoolSize) { addWorker(command, true); return; } // 否则添加到任务队列 if (workQueue.offer(command)) { return; } // 小于最大线程数,新增线程执行,此时core=false if (workers.size() < maximumPoolSize) { addWorker(command, false); return; } // 任务队列满了:拒绝 throw new RejectedExecutionException(); }

这样我们的线程池可以支持忙时临时增加线程的功能,测试一下:
设置线程池核心10,最大20,队列10, 执行30个任务,每个任务工作(休眠)3秒,保持时间是10秒

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactory() { private AtomicInteger no = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "my-thread-"+(no.incrementAndGet())); } }; Executor poolExecutor = new MyThreadPool4( 10, 20, 10, TimeUnit.SECONDS, new linkedBlockingQueue<>(10), threadFactory); for(int i=1;i<=30;i++) { int finalI = i; Runnable task = () -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"-----任务["+finalI+"] done"); }; poolExecutor.execute(task); } }

结果如下,3秒左右输出

my-thread-1-----任务[1] done my-thread-14-----任务[24] done my-thread-9-----任务[9] done my-thread-19-----任务[29] done my-thread-7-----任务[7] done my-thread-18-----任务[28] done my-thread-15-----任务[25] done my-thread-8-----任务[8] done my-thread-16-----任务[26] done my-thread-4-----任务[4] done my-thread-2-----任务[2] done my-thread-11-----任务[21] done my-thread-5-----任务[5] done my-thread-3-----任务[3] done my-thread-20-----任务[30] done my-thread-6-----任务[6] done my-thread-10-----任务[10] done my-thread-12-----任务[22] done my-thread-13-----任务[23] done my-thread-17-----任务[27] done

再过三秒输出

my-thread-1-----任务[11] done my-thread-9-----任务[13] done my-thread-19-----任务[14] done my-thread-18-----任务[16] done my-thread-7-----任务[15] done my-thread-14-----任务[12] done my-thread-15-----任务[17] done my-thread-4-----任务[20] done my-thread-8-----任务[18] done my-thread-16-----任务[19] done

说明:
1~10任务由于小于coreSize,所以增加线程立即执行
11~20由于大于coreSize,所以被加入到队列
21~30由于队列已满,maximumPoolSize未满,直接增加临时线程处理
所以110和2130马上执行,11~20是等第一波任务结束后再执行
再用调试工具跟踪一下




image.png

可以看到前10个核心worker工作完成之后一直等待,后十个临时worker工作完成之后,等待10秒就自动销毁了

淘汰机制

看上图也发现了个问题,上图第一个阶段结束就剩下11~20这10个任务了,此时线程池核心线程就可以应对,按理说再等keepAliveTime设定的10秒,临时worker就改销毁了,可实际是第二阶段临时worker抢走了任务,而核心worker部分闲置,就好比一个公司,工作已经很少了还把工作分给临时工而让正式员工闲着,这显然存在额外开销,因此不合理

结合ThreadPoolExecutor看了一眼,它的解决思路很清晰,解决方案如下:

不区分核心worker或和临时worker,当worker数大于corePoolSize时,采用竞争淘汰手段,哪个worker在keepAliveTime时间段内没有工作,就被淘汰,动态的判断,从而保证工作少时worker数最快的控制在corePoolSize,这样就非常顺滑的节省了资源

那就按照这种思路进行改造:
首先删除worker的core标识,使用一个workerCount标记池中worker数量

// 记录worker数量 private volatile AtomicInteger workerCount = new AtomicInteger(0);

当然addWorker时就需要workerCount++,然后再getTask方法上下功夫,把core标识的校验修改为workerCount>corePoolSize,大概如下

// 从队列获取任务 private Runnable getTask() { // 判断worker数量是否大于corePoolSize,如果大就使用固定时间等待 boolean timed = workerCount.get() > corePoolSize; try { Runnable runnable = !timed?workQueue.take():workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); return runnable; } catch (InterruptedException e) { return null; } }

但真这样写问题太大了,首先workerCount > corePoolSize校验结束后,一定要执行workerCount--操作的,否则所有的线程都会变成阻塞固定时间,如果任务队列任务很少,那么很多线程都被淘汰了,线程数会少于corePoolSize
一般思路是判断完成之后马上做减法,但线程池毕竟是多个线程,可能同时走到workerCount > corePoolSize的判断,再快也不管用

所以正确的姿势做自旋+CAS的方法避免线程销毁出界,如下

// 从队列获取任务 private Runnable getTask() { // 自旋 for(;;) { // 判断worker数量是否大于corePoolSize,如果大就使用固定时间等待 int wc = workerCount.get(); boolean timed = wc > corePoolSize; if (timed) { // CAS 减一操作 if(!workerCount.compareAndSet(wc, wc - 1)) { // 减一失败,重新验证 continue; } } try { // 限制时间使用BlockingQueue的poll(long timeout, TimeUnit unit)方法等待固定时间 Runnable runnable = !timed ? workQueue.take() : workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); return runnable; } catch (InterruptedException e) { e.printStackTrace(); return null; } } }

这样就避免了销毁多了,导致线程不满corePoolSize的现象发生,但是还是有个严重问题:wc > corePoolSize只是判断worker数是否大于corePoolSize,此时只是固定时间阻塞,worker并未实际销毁,workerCount和实际的worker数量出现不匹配,如果固定时间内接到了任务那么这个worker可能会存活很久,workerCount就会一直与worker数量不匹配,这肯定不行

所以问题的关键在于workerCount--操作之后,线程是一定要销毁掉的,即一定要返回null,那么可以修改个思路:worker数大于corePoolSize后,所有线程getTask时都固定阻塞一段时间等待,如果等待不到再去做workerCount--,通过CAS加自选保证只有超过corePoolSize数量实际workerCount--成功,没成功的就是corePool继续等待任务,代码改动如下:

// 从队列获取任务 private Runnable getTask() { boolean timedOut = false; // 标识最后一次poll是否超时? // 自旋 for (; ; ) { // 判断worker数量是否大于corePoolSize,如果大就使用固定时间等待 int wc = workerCount.get(); boolean timed = wc > corePoolSize; if (timed && timedOut) { // CAS 减一操作 if (workerCount.compareAndSet(wc, wc - 1)) // worker成功登记下线,返回null,worker就一定会下线 return null; // 登记失败,重新判断 continue; } try { // 限制时间使用BlockingQueue的poll(long timeout, TimeUnit unit)方法等待固定时间 Runnable r = !timed ? workQueue.take() : workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); if (r != null) // 只有任务获取成功才返回 return r; // 任务获取失败,回头去判断是否改下线 timedOut = true; } catch (InterruptedException e) { e.printStackTrace(); return null; } } }

代码有点,整理一下一个worker获取task的流程图如下





getTask流程图

再测试一次,队列大小修改为5,此时结果如下图




image.png

可以看到抢到第二波任务的线程一定不会销毁,其中编号17,18成为了coolPoolSize的一员,而编号3,4的线程虽然创建的早,但是没有接到任务就被淘汰了,此时的淘汰机制没有先来后到,只看手中是否有任务,而最终保留的线程数即为coolPoolSize数

拒绝策略

这个就比较简单了,通过策略模式,可以实现再拒绝时走固定的策略而不是像上面代码单纯的throw异常,策略接口直接用Doug Lea的RejectedExecutionHandler吧

最终代码

public class MyThreadPool implements Executor { // 核心线程数 private int corePoolSize; // 任务队列 private BlockingQueue<Runnable> workQueue; // 线程工厂 private ThreadFactory threadFactory; // 最大线程数 private volatile int maximumPoolSize; // 保持存活时间,纳秒 private volatile long keepAliveTime; // 记录worker数量 private volatile AtomicInteger workerCount = new AtomicInteger(0); // 拒绝策略 RejectedExecutionHandler handler; public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { this.corePoolSize = corePoolSize; this.workQueue = workQueue; this.threadFactory = threadFactory; this.maximumPoolSize = maximumPoolSize; // 按单位将时间转换为纳秒 this.keepAliveTime = unit.tonanos(keepAliveTime); this.handler = handler; } public static void main(String[] args) { System.out.println(TimeUnit.MILLISECONDS.tonanos(1)); } // 保存所有的worker private final HashSet<Worker> workers = new HashSet<Worker>(); @Override public void execute(Runnable command) { // 如果工作线程少于corePoolSize,新增线程执行,此时core=true if (workerCount.get() < corePoolSize) { addWorker(command, true); return; } // 否则添加到任务队列 if (workQueue.offer(command)) { return; } // 小于最大线程数,新增线程执行,此时core=false if (workerCount.get() < maximumPoolSize) { addWorker(command, false); return; } handler.rejectedExecution(command, null); } private void addWorker(Runnable command, boolean core) { Worker w = new Worker(command); final Thread t = w.thread; t.start(); workers.add(w); workerCount.incrementAndGet(); } // 拥有工作任务的工作者 private final class Worker implements Runnable { // 线程 Thread thread; // 第一个任务 Runnable firstTask; // 工作者初始化 Worker(Runnable task) { // 保存第一个任务 firstTask = task; // 初始化工作台 thread = threadFactory.newThread(this); } // 工作者的任务计划 @Override public void run() { Runnable task = firstTask; firstTask = null; // 第一个任务未执行执行第一个,执行完后去队列获取任务,执行结束后再次尝试获取 while (task != null || (task = getTask()) != null) { try { task.run(); } finally { // 完成后删除任务 task = null; } } // 任务结束,删除worker workers.remove(this); } // 从队列获取任务 private Runnable getTask() { boolean timedOut = false; // 标识最后一次poll是否超时? // 自旋 for (; ; ) { // 判断worker数量是否大于corePoolSize,如果大就使用固定时间等待 int wc = workerCount.get(); boolean timed = wc > corePoolSize; if (timed && timedOut) { // CAS 减一操作 if (workerCount.compareAndSet(wc, wc - 1)) // worker成功登记下线,返回null,worker就一定会下线 return null; // 登记失败,重新判断 continue; } try { // 限制时间使用BlockingQueue的poll(long timeout, TimeUnit unit)方法等待固定时间 Runnable r = !timed ? workQueue.take() : workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); if (r != null) // 只有任务获取成功才返回 return r; // 任务获取失败,回头去判断是否改下线 timedOut = true; } catch (InterruptedException e) { e.printStackTrace(); return null; } } } } }

测试代码

public class MyTest { public static void main(String[] args) { // 线程工厂 ThreadFactory threadFactory = new ThreadFactory() { private AtomicInteger no = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "pq-thread-"+(no.incrementAndGet())); } }; // 拒绝策略 RejectedExecutionHandler rejectedExecutionHandler = (r, executor) -> { System.out.println("为什么拒绝我!"); }; Executor poolExecutor = new MyThreadPool5( 10, 20, 10, TimeUnit.SECONDS, new linkedBlockingQueue<>(5), threadFactory, rejectedExecutionHandler); for(int i=1;i<=30;i++) { int finalI = i; Runnable task = () -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"-----任务["+finalI+"] done"); }; poolExecutor.execute(task); } } }

总结

如开篇所属,自己写线程池的目的是为了理解线程池类:ThreadPoolExecutor,基本最终代码就是把ThreadPoolExecutor的核心代码拿过来,运行跑一跑,再回头看基本就明白Doug Lea代码为什么这么写了,但只是实现了一个简单版,而且完全没有考虑多线程运行同一个线程池时的线程安全问题(尝试考虑过,但太复杂了,不得不佩服大神)
ThreadPoolExecutor包含大量的线程安全处理和异常处理,并且workerCount的计算用的二进制方式,很高逼格,能力有限,感觉自己理解到这程度足以,感兴趣的可以自己去看一下~

发布人:d88c****    IP:223.213.90.***     举报/删稿
展会推荐
让朕来说2句
评论
收藏
点赞
转发