Java 多线程 基础知识(二)

1. 并发容器

1.1 ConcurrentHashMap

  • HashMap不是线程安全的

  • 并发情况下一个可行的方式是使用Collections.synchronizedMap()来包装HashMap。

    • 但问题在于一个全局的锁同步不同线程之间的并发访问,会带来不可忽视的性能问题
  • 故而使用ConcurrentHashMap

    • 读写都能保证较高的性能
    • 读操作时几乎不需要加锁
    • 写操作的时候通过锁分段技术只对所操作的段加锁而不影响客户端对其他段的访问
  • ConcurrentHashMap和HashTable的区别主要体现在实现线程安全的方式上不同

    • 底层数据结构
      • ConcurrentHashMap使用分段的数组和链表
      • Hashtable用数组和链表,数组为主体,链表是为了解决哈希冲突的
    • 线程安全的实现方式
      • 使用node数组 + 链表 + 红黑树的数据结构来实现,并发控制使用synchronized和CAS操作
      • Hashtable是使用synchronized来保证线程安全的,效率相对较低

        1.2 CopyOnWriteArrayList

  • 针对现实应用场景当中,读操作远远多于写操作,因为读操作不会修改原有数据,所以就不对读进行加锁操作了。允许多个线程同时访问list的内部数据。

  • ReentranReadWriteLock 读写锁是读读共享、写写互斥、读写互斥、写读互斥

  • 而CopyOnWriteArrayList 是读取完全不加锁,写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待。

  • 如何实现的
    • 所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。
    • 从计算机系统的角度来说,实际上是拷贝内存,在新内存完成写操作,并将原先的内存指针指向新的内存,原有的内存就可以被回收掉了
    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;
    public E get(int index) {
        return get(getArray(), index);
    }
    @SuppressWarnings("unchecked")
    private E get(Object[] a, int index) {
        return (E) a[index];
    }
    final Object[] getArray() {
        return array;
    }

        /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();//加锁
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝新数组
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();//释放锁
        }
    }

1.3 ConcurrentLinkedQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出,ConcurrentLinkedQueue这个队列使用链表作为其数据结构.ConcurrentLinkedQueue 应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。

其中主要使用CAS非阻塞算法来实现

2. 乐观锁悲观锁

乐观锁适用于写比较少的情况,即冲突本身发生的可能性就比较低,这样就能省去锁的开销,加大整个系统的吞吐量;但是多写的情况下,会比较容易产生冲突,这样就会导致上层不断进行retry,反倒会降低性能,所以一般多写的场景下用悲观锁比较合适。

2.1 乐观锁

总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。

关于CAS算法,实质上就先拿到指定内存上的数据,(读取操作),线程操作处理数据,在要写入之前,再次查询该内存位置上的数据,如果数据一致,那就可以写入,如果数据不一致,就throw exception,告知系统出现了问题。

2.1.1 乐观锁实现方式

乐观锁可以使用版本号机制或者CAS算法来进行实现

  • 版本号机制

    • 在数据表中加上数据版本号version字段,表示数据被修改的次数
    • 被修改,version值会+1
    • 当线程A要更新数据时,读数据的同时也会读取version值,提交更新的时候,若刚才读取到的version值和当前数据库的version值相等才更新,否在重试
  • CAS算法

    • compare and swap算法,无锁编程
    • 不使用锁的情况下实现多线程之间的变量同步,即在没有线程被阻塞的情况下实现变量的同步 – 非阻塞同步 Non=blocking synchronization

2.1.2 缺点

  • ABA 问题

    • 一个变量初始值为A,在准备赋值的时候仍为A,但是在这段时间当中它有可能已经被改为了其他的值了,CAS操作会认为它从来没有被修改过
    • 可以使用AtomicStampedReference类,compareAndSet方法首先检查当前引用是否等于预期引用,以及当前标志是否等于预期标志。如果全部相等,就以原子方式将该引用和该标志的值设置为给定的更新值。
  • 循环时间开销大

    • 自旋CAS如果长时间不成功,会给CPU带来很大的执行开销
  • 只能保证一个共享变量的原子操作

    • CAS只对单个共享变量有效,当操作涉及多个共享变量的时候CAS无效
    • AtomicReference这一类能够保证引用对象之间的原子性,可以将多个变量放在一个对象里进行CAS操作

2.2 悲观锁

总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronized和ReentrantLock等独占锁就是悲观锁思想的实现。

3. 线程池

  • 线程池用来限制和管理资源,每个线程池还可以维护一些基本统计信息
  • 好处
    • 降低资源消耗
      • 重复利用已经创建的线程,来降低线程创建和销毁造成的消耗
    • 提高响应速度
      • 当任务到达时,任务可以不需要的等到线程创建就能立即执行
    • 提高线程的可管理性
      • 线程是稀缺资源,无限制创建会消耗系统资源,并且降低系统稳定性;使用线程池可以进行统一分配,调优和监控

3.1 ThreadPoolExecutor详解

/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize
    • 定义了不会timeout的最小的同时工作的线程数量
  • maxPoolSize
    • 定义了可以被创建的线程的最大数量
    • 和CorePoolSize的区别在于当提交一个新的任务,当前线程数量小于corePoolSize的时候,哪怕现在存在的线程是空闲的,还是会创建新线程来运行这个任务;maxPoolSize说的是最多能够创建的线程数量,是上限
  • workQueue
    • 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中
  • handler 饱和策略 - 当当前同时运行的线程数量达到最大线程数量,并且队列已经被放满了的时候的策略
    • AbortPolicy
      • 抛出RejectedExecutionException来拒绝新的任务的处理
    • CallerRunsPolicy
      • 调用执行自己的线程运行任务,会有延迟
    • DiscardPolicy
      • 不处理新任务,直接丢弃掉
    • DiscardOldestPolicy
      • 丢弃最早的未处理的任务请求
  • Executor.execute代码的源码如下:

      // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
      private static int workerCountOf(int c) {
          return c & CAPACITY;
      }
    
      private final BlockingQueue<Runnable> workQueue;
    
      public void execute(Runnable command) {
          // 如果任务为null,则抛出异常。
          if (command == null)
              throw new NullPointerException();
          // ctl 中保存的线程池当前的一些状态信息
          int c = ctl.get();
    
          //  下面会涉及到 3 步 操作
          // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
          // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
          if (workerCountOf(c) < corePoolSize) {
              if (addWorker(command, true))
                  return;
              c = ctl.get();
          }
          // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
          // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
          if (isRunning(c) && workQueue.offer(command)) {
              int recheck = ctl.get();
              // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
              if (!isRunning(recheck) && remove(command))
                  reject(command);
                  // 如果当前线程池为空就新创建一个线程并执行。
              else if (workerCountOf(recheck) == 0)
                  addWorker(null, false);
          }
          //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
          //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
          else if (!addWorker(command, false))
              reject(command);
      }

execute process

3.2 Executor框架

Java5以后引入的Executor,用其启动线程比使用Thread的start方法更好,易管理,效率高,还可以帮助避免this逃逸的问题。Executor框架提供了:

  • 线程池管理
  • 线程工厂
  • 队列
  • 拒绝策略

3.2.1 框架结构

  • 任务
    • 执行任务实现Runnable或者Callable接口,然后被ThreadPoolExecutor或者ScheduledThreadPoolExecutor来执行
  • 任务执行
    • Executor
  • 异步计算的结果
    • Future接口以及Future接口实现类FutureTask都可以来代表异步计算的结果

Exectuor 流程图

整个过程中,主线程首先创建并实现了Runnable或者Callable的任务对象,而后将对象交给ExecutorService来执行,然后拿到返回的Future接口,执行FutureTask.get()等方法来等待任务执行完成

3.3 常用线程池

3.3.1 FixedThreadPool

  • FixedThreadPool

    • 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
    • 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue;
    • 线程池中的线程执行完 手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行;
  • 不推荐使用

    • 线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize

    • 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值

    • 由于上述两点,keepAliveTime就会是一个无效参数了

    • 因为无法执行shutdown() shutdownNow(),不会拒绝任务,在任务比较多的时候会导致OOM(内存溢出的问题)

      public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads,

                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

      }

3.3.2 CachedThreadPool

可以根据需要来创建新线程的线程池

    /**
     * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

注意看源码中,corePoolSize设置为空,maximumPoolSize设置为无界的了,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度,CachedThreadPool会不断创建新的线程,极端情况下,会耗尽CPU和内存资源的。

Reference

  1. https://github.com/Snailclimb/JavaGuide/
  2. https://howtodoinjava.com/java/multi-threading/compare-and-swap-cas-algorithm/

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 stone2paul@gmail.com

文章标题:Java 多线程 基础知识(二)

文章字数:3.4k

本文作者:Leilei Chen

发布时间:2020-05-04, 01:51:40

最后更新:2020-05-04, 02:20:26

原始链接:https://www.llchen60.com/Java-%E5%A4%9A%E7%BA%BF%E7%A8%8B-%E5%9F%BA%E7%A1%80%E7%9F%A5%E8%AF%86-%E4%BA%8C/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏