使用ThreadPoolExecutor解决并发问题
业务功能需要对多条信息数据校验其合法性,该校验非HTTP协议的接口校验,需要通过主机发送whois命令,根据返回的结果处理信息。而该命令的从请求到结果的返回需要5-12s。若业务侧需要同一时间对大量数据进行校验,若不进行优化,则会导致接口执行时间过长。
thread pool,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
- 降低线程创建和销毁过程的资源消耗
- 提高响应速度
- 提高线程的可管理型

ThreadPoolExecutor分析
Section titled “ThreadPoolExecutor分析”构造器各个参数的含义
Section titled “构造器各个参数的含义”int corePoolSize:核心线程数,当执行任务个数 < corePoolSize,就会启动对应的个数线程去执行任务。 int maximumPoolSize:线程池中允许的最大线程数,BlockingQueue满了,执行任务数 < maximumPoolSize的时候,才会再次创建新的线程来执行任务。 long keepAliveTime:线程空闲下来之后,存货的时间;只在活动线程数量 > corePoolSize的时候才有用。 TimeUnit unit,空闲线程存活的时间单位。 BlockingQueue workQueue:要保存任务的阻塞队列 ThreadFactory threadFactory:创建线程的工厂,给新建线程的名称赋值 RejectedExecutionHandler handler:饱和策略/拒绝策略/丢弃策略
- AbortPolicy:直接抛出异常,默认的情况 CallerRunsPolicy:用调用者所在的线程去执行任务 DiscardOldestPolicy:丢弃阻塞队列中最老的任务,也就是队列中最靠前的任务 DiscardPolicy:直接丢弃当前任务 其他:自己新建一个类,实现RejectedExecutionHandler接口,自定义饱和策略。
void execute(Runnable command) 不需要返回
Future submit(Callable task) 需要返回
shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程
shutdown():设置线程池的状态,只会中断所有没有执行任务的线程
线程池的工作机制
Section titled “线程池的工作机制”
-
优先使用核心线程处理任务。
-
如果核心线程都已忙碌,则将任务压入阻塞队列,等待线程池有空闲线程,从队列中获取任务执行。
-
当阻塞队列已满,启动新的线程执行任务。前提是线程池支持的线程数充足。
-
直到线程池所有的线程都在执行任务,且无法再启动新的线程,此时执行饱和策略(拒绝策略)
合理配置线程池
Section titled “合理配置线程池”-
计算密集型
- 加密、大数分解、正则
-
线程数小,推荐:CPU核心数+1
-
IO密集型
- 读取文件、数据库连接、网络通讯
-
线程数大,推荐:CPU核心数*2
-
混合型
- 尽量拆分
-
IO密集型>>计算密集型:拆分意义不大
-
IO密集型≈计算密集型,建议使用有界队列
常见系统线程池
Section titled “常见系统线程池”-
FixedThreadPool
- 固定线程数量,适用负载较重的服务器,使用了无界队列LInkedBlockingQueue
-
SingleThreadExecutor
- 创建单个线程,需要保证顺序执行任务,不会有多个线程活动,使用了无界队列
-
CachedThreadPool
- 根据需要创建新线程,执行很多短期异步任务的程序SynchronousQueue
-
WorkStealingPool
-
ScheduledThreadPoolExecutor
- 需要定期、周期地执行任务
Executor框架使用流程
Section titled “Executor框架使用流程”
-
一般都不能直接使用Executors.new***ThreadPool()的情况
- 使用了无界队列,系统几乎没有使用有界队列,OOM异常
-
系统提供的饱和策略:直接抛出异常,不合适。
-
饱和策略可以自己去实现,通过写日志、存入本地文件、DB、生成一个新的队列等都是有可能的。
结合线程池与CountDownLatch解决并发同步问题
Section titled “结合线程池与CountDownLatch解决并发同步问题”结合需求背景:
- 网络通信
- whois调用频率问题,同一时间批量调用会导致请求被拒绝而返回空值的问题
- 结合实际业务场景,选择核心线程数量为CPU核心数*2:32,BlockingQueue队列长度根据具体业务场景决定。
- 结合对端的请求频率限制策略,循环获取需要校验的信息数据时,主线程往BlockingQueue队列塞任务前,先随机睡眠100-500ms,避免被限制请求。
- 设置CountDownLatch数量为需要校验的信息数据的数量,每个线程任务完全结束后执行countDownLatch.countdown()
- 主线程往BlockingQueue插完任务后,在主线程调用countDownLatch.await()等待线程执行完毕,未避免线程任务出问题。可设置等待的时长boolean b = countDownLatch.await(num, TimeUnit),返回执行结果。
- 线程之间使用共享变量,将结果存到List中。
public void checkPrefixInfos () throws IOException { ThreadPoolExecutor executorService = new ThreadPoolExecutor(32, 32, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(500)); final long MIN_MILLISECONDS = 100L; final long MAX_MILLISECONDS = 500L; try { //业务逻辑省略…… final CountDownLatch latch = new CountDownLatch(prefixModuleArr.size()); for (Object o : prefixModuleArr) { JSONObject prefixModuleObject = (JSONObject) o; PrefixValid prefixValid = (PrefixValid) JSONObject.toBean(prefixModuleObject, PrefixValid.class); // 随机休眠,减少因并发请求导致对端系统触发安全策略的可能 Thread.sleep(MIN_MILLISECONDS + (int) (Math.random() * (MAX_MILLISECONDS - MIN_MILLISECONDS))); CheckPrefixThread checkPrefixThread = new CheckPrefixThread(prefixValid, successList, failList, latch); executorService.execute(checkPrefixThread); } // 主线程等待五分钟,超时则返回失败 boolean isSuccess = latch.await(300, TimeUnit.SECONDS); //业务逻辑省略…… } catch (RejectedExecutionException e) { /* 线程池默认饱和策略:AbortPolicy,出现饱和策略抛出的异常一般是因为 校验数量过多 & 核心线程中的prefix一直处于重试状态,导致无空闲线程释放, 导致任务队列饱和,这里直接捕获异常处理 */ // 也可以自己实现RejectedExecutionHandler自定义饱和策略 prefixResult.setRetCode(PrefixResult._TIMEOUT); prefixResult.setRetMsg("线程池饱和!"); JSONObject jSONObject = JSONObject.fromObject(prefixResult); ResponseUtil.print(ServletActionContext.getResponse(), jSONObject.toString()); } catch (Exception e) { throw new RuntimeException(e); } finally { executorService.shutdown(); }public class CheckPrefixThread implements Runnable { // 重试次数 final int RETRY_TIMES = 2; private int RUN_TIME = 1; private PrefixValid prefixValid; private List<PrefixValid> successList; private List<PrefixValid> failList; private CountDownLatch countDownLatch; private String apnicResult = "UnChecked"; private String radbResult = "UnChecked"; private Boolean originFlag = false; private String prefix; private String asPath;
/** * Whois校验Prefix信息 */ @Override public void run() { checkPrefix(); PrefixValid resValid = new PrefixValid(); resValid.setPrefix(prefix); resValid.setAsPath(asPath); resValid.setIndex(prefixValid.getIndex()); resValid.setApnicResult(apnicResult); resValid.setRadbResult(radbResult); if (originFlag) { successList.add(resValid); } else { failList.add(resValid); } countDownLatch.countDown(); }
private void checkPrefix() {
// 业务逻辑省略…… }
//Constructor public CheckPrefixThread(PrefixValid prefixValid, List<PrefixValid> successList, List<PrefixValid> failList, CountDownLatch countDownLatch) { this.prefixValid = prefixValid; this.successList = successList; this.failList = failList; this.countDownLatch = countDownLatch; // 避免可预见的OS命令注入漏洞 this.prefix = prefixValid.getPrefix().replaceAll("&", "").replaceAll("\\|", ""); this.asPath = prefixValid.getAsPath(); }}