Java Notes
1. 并发
1.1 ThreadLocal复用问题
ThreadLocal适用于变量在线程间隔离,而在方法或类之间共享的场景。如果用户信息的获取比较昂贵,那么在ThreadLocal中缓存数据时比较合适的做法。
private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
@GetMapping("wrong")
public Map wrong(@RequestParam("userId") Integer userId) {
//设置用户信息之前先查询一次ThreadLocal中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
//设置用户信息到ThreadLocal
currentUser.set(userId);
//设置用户信息之后再查询一次ThreadLocal中的用户信息
String after = Thread.currentThread().getName() + ":" + currentUser.get();
//汇总输出两次查询结果
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
}
上述例子当中,我们在设置前设置后都做了记录,来看threadLocal当中都记录了什么信息,。值得注意的是程序是运行在Tomcat当中的,执行程序的线程是Tomcat的工作线程,而Tomcat的工作线程是基于线程池的。
即会重用几个固定的线程,一旦线程重用,那么很可能首次从ThreadLocal获取的值是之前其他用户的请求遗留的值。这时ThreadLocal中的用户信息就是其他用户的信息了。
Take Away:
- 代码中没用多线程不以为着你的程序没有使用多线程,Tomcat的Web服务器的业务代码,本身就运行在一个多线程环境当中
- 使用线程池处理数据就意味着线程是会被重用的,使用类似ThreadLocal工具来存放一些数据的时候,需要注意在代码运行完之后,显式去清空设置的数据。
修正复用的问题的bug:
private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
@GetMapping("right")
public Map right(@RequestParam("userId") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
} finally {
//在finally代码块中删除ThreadLocal中的数据,确保数据不串
currentUser.remove();
}
}
1.2 ConcurrentHashMap
- ConcurrentHashMap是线程安全的哈希表容器,这里的线程安全是指原子性读写操作是线程安全的。
- 例子 – 10个线程一起来补充总共100个元素进去
//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;
//帮助方法,用来获得一个指定元素数量模拟数据的ConcurrentHashMap
private ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
}
@GetMapping("wrong")
public String wrong() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900个元素
log.info("init size:{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用线程池并发处理逻辑
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查询还需要补充多少个元素
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
//补充元素
concurrentHashMap.putAll(getData(gap));
}));
//等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最后元素个数会是1000吗?
log.info("finish size:{}", concurrentHashMap.size());
return "OK";
}
这样子执行的结果就是加入远远超过预期的数量,因为ConcurrentHashMap可以保证多个worker工作的时候不会互相干扰,但是无法保证看到的当前ConcurrentHashMap数据数量的同步
- Take Aways
- 使用ConcurrentHashMap,不代表对其多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保,需要手动加锁
- 诸如size,isEmpty和containsValue等聚合方法,在并发情况下可能会反映ConcurrentHashMap的中间状态,因此在并发情况下,*这些方法的返回值只能用作参考,而不能用于流程控制
解决方案就是通过加锁,使得同时只有一个线程可以操作ConcurrentHashMap
@GetMapping("right")
public String right() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size:{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//下面的这段复合逻辑需要锁一下这个ConcurrentHashMap
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
concurrentHashMap.putAll(getData(gap));
}
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size());
return "OK";
}
- 充分使用ConcurrentHashMap的特性
- 例如面对一个使用Map来统计Key出现次数的场景
- key范围为10, 最多使用10个并发,循环操作1000万次,每次操作累加随机的key
- 如果key不存在的话,首次设置值为1
//循环次数
private static int LOOP_COUNT = 10000000;
//线程数量
private static int THREAD_COUNT = 10;
//元素数量
private static int ITEM_COUNT = 10;
private Map<String, Long> normaluse() throws InterruptedException {
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
//获得一个随机的Key
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
if (freqs.containsKey(key)) {
//Key存在则+1
freqs.put(key, freqs.get(key) + 1);
} else {
//Key不存在则初始化为1
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;
}
但是实际上ConcurrentHashMap本身是使用的Java自带的CAS操作的,在虚拟机层面确保了写入数据的原子性,比加锁的效率高很多,因此相较于直接加synchronized重量锁,我们可以通过computeIfAbsent()操作,和线程安全累加器LongAdder来更有效率的实现我们的统计目的
private Map<String, Long> gooduse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
//利用computeIfAbsent()方法来实例化LongAdder,然后利用LongAdder来进行线程安全计数
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//因为我们的Value是LongAdder而不是Long,所以需要做一次转换才能返回
return freqs.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue())
);
}
- 上述代码中,直接使用了ConcurrentHashMap的原子性方法computeIfAbsent来做符合逻辑操作,判断Key是否存在Value,如果不存在则把Lambda表达式运行后的结果放入Map作为Value
- LongAdder是线程安全的累加器,因此可以直接调用其increment()方法来做累加。
1.3 锁
- 加锁前需要知道锁和被保护的对象是不是一个层面上的
- 静态字段属于类,需要类级别的锁来进行保护
- 非静态字段属于类实例,实例级别的锁就可以保护
// 定义一个静态int字段counter和一个非静态的wrong方法,实现counter字段的累加操作
class Data {
@Getter
private static int counter = 0;
public static int reset() {
counter = 0;
return counter;
}
public synchronized void wrong() {
counter++;
}
}
// 测试代码
@GetMapping("wrong")
public int wrong(@RequestParam(value = "count", defaultValue = "1000000") int count) {
Data.reset();
//多线程循环一定次数调用Data类不同实例的wrong方法
IntStream.rangeClosed(1, count).parallel().forEach(i -> new Data().wrong());
return Data.getCounter();
}
输出结果,因为默认运行100万次,但是页面输出的并不会是100万。
- 在非静态的wrong方法上加锁,只能够保证多个线程无法执行同一个实例的wrong方法,但无法保证其不会执行不同实例的wrong方法。而静态的counter是被共享的
- 解决方案时保证在一个实例的方法操作静态变量的时候,其他的实例无法操作这个静态变量
class Data {
@Getter
private static int counter = 0;
private static Object locker = new Object();
public void right() {
synchronized (locker) {
counter++;
}
}
}
- 除此以外,对锁可以做的优化还包括
- 精细化锁应用的范围
- 区分读写场景以及资源的访问冲突,考虑使用悲观锁还是乐观锁
- 对于读写比例差异明显的场景,考虑使用ReentrantReadWriteLock细化区分读写锁,来提高性能
- 如果共享资源冲突概率不大,可以考虑使用StampedLock的乐观读的特性,进一步提高性能
1.4 线程池
开发当中,我们会使用各种池化技术来缓存创建昂贵的对象,比如线程池,连接池,内存池。一般是预先创建一些对象放入到池当中,使用的时候直接取出使用,用完归还以便复用。通过一定的策略调整池中缓存对象的数量,实现池的动态伸缩。
- 应当手动进行线程池的声明
- Java Executors定义了一些快捷的工具办法,来帮助我们快速创建线程池
- 应当禁止使用这些方法来创建线程池,应当手动new ThreadPoolExecutor来创建线程池
- 资源耗尽导致OOM问题
- newFixedThreadPool
- newCachedThreadPool
- 资源耗尽导致OOM问题
1.4.1 newFixedThreadPool OOM 问题
@GetMapping("oom1")
public void oom1() throws InterruptedException {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
//打印线程池的信息,稍后我会解释这段代码
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
}
- 日志显示出现了OOM
- newFixedThreadPool源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
...
}
- 直接使用了一个LinkedBlockingQueue,而默认构造方法是一个Integer.MAX_VALUE长度的队列,是无界的。
- 尽管使用newFixedThreadPool可以把工作线程控制在固定的数量上,但任务队列是无界的。如果任务比较多并且执行比较慢的话,队列可能会迅速积压,撑爆内存导致OOM
1.4.2 newCachedThreadPool OOM问题
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
- 线程池最大线程数为Integer.MAX_VALUE,是没有上限的,其工作队列SynchronizedQueue是一个没有存储空间的阻塞队列。
- SynchronousQueue是没有存储空间的阻塞队列,有请求到来的时候,必须要找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的
1.4.3 线程池配置Best Practice
根据自己的场景,并发情况来评估线程池的几个核心参数,需要设置有界的工作队列和可控的线程数
- 核心线程数
- 最大线程数
- 线程回收策略
- 工作队列的类型
- 拒绝策略
为线程池指定有意义的名称,来方便问题的排查,当出现线程数暴增,线程死锁,线程占用大量CPU这类问题的时候,会抓取线程栈来进行分析,这个时候有意义的线程名称,可以很大程度上方便我们对问题的定位
Metrics, alarm来观察线程池的状态
- 线程池特性
- 不会初始化corePoolSize个线程,有任务来了才创建工作线程
- 当核心线程满了之后不会立即扩容线程池,而是把任务堆积到工作队列当中
- 当工作队列满了之后扩容线程池,一直到线程个数达到maximumPoolSize为止
- 如果队列已满其达到了最大线程后还有任务来,就按照拒绝策略来处理
- 当线程数大于核心线程数时,线程等待KeepAliveTime后还没有任务需要处理的话,收缩线程到核心线程数
@GetMapping("right")
public int right() throws InterruptedException {
//使用一个计数器跟踪完成的任务数
AtomicInteger atomicInteger = new AtomicInteger();
//创建一个具有2个核心线程、5个最大线程,使用容量为10的ArrayBlockingQueue阻塞队列作为工作队列的线程池,使用默认的AbortPolicy拒绝策略
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, 5,
5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get(),
new ThreadPoolExecutor.AbortPolicy());
printStats(threadPool);
//每隔1秒提交一次,一共提交20次任务
IntStream.rangeClosed(1, 20).forEach(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
int id = atomicInteger.incrementAndGet();
try {
threadPool.submit(() -> {
log.info("{} started", id);
//每个任务耗时10秒
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
log.info("{} finished", id);
});
} catch (Exception ex) {
//提交出现异常的话,打印出错信息并为计数器减一
log.error("error submitting task {}", id, ex);
atomicInteger.decrementAndGet();
}
});
TimeUnit.SECONDS.sleep(60);
return atomicInteger.intValue();
}
1.4.4 线程池本身不复用
@GetMapping("wrong")
public String wrong() throws InterruptedException {
ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();
IntStream.rangeClosed(1, 10).forEach(i -> {
threadPool.execute(() -> {
...
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
});
});
return "OK";
}
class ThreadPoolHelper {
public static ThreadPoolExecutor getThreadPool() {
//线程池没有复用
return (ThreadPoolExecutor) Executors.newCachedThreadPool();
}
}
通过这种方式,会不停产生新的线程,整个业务程序会不停产生新的threadPool,因为newCachedThreadPool的核心线程数是0, keepAliveTime是60秒,过了60s以后线程就会被回收了。
1.4.5 线程池的使用策略
- 对于线程池如何使用,放什么样的任务进去,是需要根据任务的轻重缓急来指定线程池的核心参数,包括线程数,回收策略和任务队列
- 对于执行比较慢,数量不大的IO任务,可以考虑更多的线程数,而不需要太大的队列
- 对于吞吐量比较大的计算型任务,线程数量不应该过多,可以是CPU核心数,或者核心数 x 2。
- 因为线程是需要调度到某个CPU当中进行的,如果任务本身是CPU绑定的任务,那么过多的线程只会增加线程切换的开销,并不能提升吞吐量
- 需要比较长的队列来做缓冲
2. 连接池
2.1 连接池定义
- 对外提供获得连接
- 归还连接的接口给客户端使用
- 暴露最小空闲连接数,最大连接数等可配置参数
- 内部实现连接建立,连接心跳保持,连接管理,空闲连接回收,连接可用性检测等功能
- 应用场景
- 数据库连接池
- Redis连接池
- HTTP连接池
2.2 应用场景
2.2.1 判断客户端SDK是否基于连接池
使用第三方客户端进行网络通信的时候,需要确定客户端SDK是否是基于连接池技术实现的
TCP是面向连接的基于字节流的协议
面向连接
- 连接需要先创建,需要先做三次握手,是有开销的
基于字节流
- 字节是发送数据的最小单元
- TCP是数据读写的通道,本身不知道哪些是完整的消息体,也不知道是否有多个客户端在使用同一个TCP连接
客户端SDK对外提供API的方式
- 连接池和连接分离的 API:有一个 XXXPool 类负责连接池实现,先从其获得连接 XXXConnection,然后用获得的连接进行服务端请求,完成后使用者需要归还连接。通常,XXXPool 是线程安全的,可以并发获取和归还连接,而 XXXConnection 是非线程安全的。对应到连接池的结构示意图中,XXXPool 就是右边连接池那个框,左边的客户端是我们自己的代码
- 内部带有连接池的 API:对外提供一个 XXXClient 类,通过这个类可以直接进行服务端请求;这个类内部维护了连接池,SDK 使用者无需考虑连接的获取和归还问题。一般而言,XXXClient 是线程安全的。对应到连接池的结构示意图中,整个 API 就是蓝色框包裹的部分
- 非连接池的 API:一般命名为 XXXConnection,以区分其是基于连接池还是单连接的,而不建议命名为 XXXClient 或直接是 XXX。直接连接方式的 API 基于单一连接,每次使用都需要创建和断开连接,性能一般,且通常不是线程安全的。对应到连接池的结构示意图中,这种形式相当于没有右边连接池那个框,客户端直接连接服务端创建连接
2.2.2 复用连接池
- 创建连接池的时候很可能一次性创建了多个连接,大多数连接池考虑到性能,会在初始化的时候维护一定数量的最小连接(毕竟初始化连接池的过程一般是一次性的),可以直接使用。如果每次使用连接池都按需创建连接池,那么很可能你只用到一个连接,但是创建了 N 个连接
- 连接池有管理模块,会有闲置超时,定时来回收闲置的连接,将活跃连接数降到最低连接的配置值,以此减轻服务端的压力
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 stone2paul@gmail.com
文章标题:Java Notes
文章字数:4.2k
本文作者:Leilei Chen
发布时间:2020-08-04, 10:19:37
最后更新:2020-08-21, 11:01:01
原始链接:https://www.llchen60.com/Java-Notes/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。