专业的JAVA编程教程与资源

网站首页 > java教程 正文

Java互联网架构-残酷月光多线程并发编程的总结和梳理

temp10 2024-10-13 09:30:09 java教程 9 ℃ 0 评论

概述

面向对象使我们能将程序划分成相互独立的模块。但是你时常还会碰到,不但要把程序分解开来,而且还要让它的各个部分都能独立运行的问题。

Java互联网架构-残酷月光多线程并发编程的总结和梳理

这种能独立运行的子任务就是线程(thread)。编程的时候,你可以认为线程都是能独立运行的,有自己CPU的子任务。实际上,是一些底层机制在为你分割CPU的时间,只是你不知道罢了。这种做法能简化多线程的编程。

进程(process)是一种有专属地址空间的"自含式(self-contained)"程序。通过在不同的任务之间定时切换CPU,多任务(multitasking)操作系统营造出一种同一个时点可以有多个进程(程序)在同时运行的效果。线程是进程内部的独立的,有序的指令流。由此,一个进程能包含多个并发执行的线程。

多线程的用途很广,但归纳起来不外乎,程序的某一部分正在等一个事件或资源,而你又不想让它把整个程序都给阻塞了。因此你可以创建一个与该事件或资源相关的线程,让它与主程序分开来运行。

学习并发编程就像是去探访一个新的世界,同时学习一种新的编程语言,最起码也得接受一套新的理念。随着绝大多数的微电脑操作系统提供了多线程支持,编程语言和类库也做了相应的扩展。

Java内存模型的基础

通信

在共享内存的模型里,通过写-读内存中的公共状态进行隐式通信;在消息传递的并发模型里,线程之间必须通过发送消息来进行显示的通信。

同步

在共享内存并发模型里,同步是显示进行的,程序员必须显示指定某个方法或者某段代码需要在线程之间互斥执行;在消息传递的并发模型里,由于消息的发送必须在接收之前,因此同步是隐式进行的。

在Java中,所有实例域、静态域和数组元素都存储在堆内存中,堆内存在线程之间共享;局部变量、方法定义参数和异常处理器参数不会在线程之间共享。

从抽象角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存涵盖了缓存、写缓冲区、寄存器以及其它的硬件和编译器优化。

JMM通过控制主内存与每个线程的本地内存之间的交互,来为Java程序员提供内存可见性保证。

重排序

指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段:

编译器优化的重排序:编译器在不改变单线程程序语义的前提下,重新安排语句的执行顺序。

处理器的指令级并行的重排序:如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。

内存系统的重排序:由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

JMM的编译器重新排序规则会禁止特定类型的编译器重排序,对于处理器重排序,JMM的处理器重排序规则会要求Java编译器在生成指令时,插入特定类型的内存屏障。

现代的处理器使用写缓冲区临时保存向内存写入的数据,但每个处理器上的写缓冲区,仅仅对它所在的处理器可见。

由于写缓冲区仅对自己的处理器可见,它会导致处理器执行内存操作的顺序可能会与内存实际的操作执行顺序不一致,由于现代的处理器都会使用写缓冲区,因此现代的处理器都会允许对写-读操作进行重排序,但不允许对存在数据依赖的操作做重排序。

happens-before简介

用来阐述操作之间的内存可见性,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作必须要存在happens-before关系,这两个操作既可以在一个线程之内,也可以在不同线程之间,但并不等于前一个操作必须要在后一个操作之前执行。

数据依赖性

编译器和处理器不会改变存在数据依赖关系的两个操作的执行顺序,但是仅针对单个处理器中执行的指令序列和单个线程中执行的操作。

as-if-serial

无论怎么重排序,单线程程序的执行结果不能改变。

在单线程中,对存在控制依赖的操作重排序,不会改变执行结果;但在多线程程序中,对存在控制依赖的操作重排序,可能会改变程序的执行结果。

顺序一致性

顺序一致性是一个理论参考模型,在设计的时候,处理器的内存模型和编程语言的内存模型都会以顺序一致性内存作为参照。

如果程序是正确同步的,程序的执行将具有顺序一致性:即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。

如果程序是正确同步的,程序的执行将具有顺序一致性:即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。

顺序一致模型有两大特性:

一个线程中的所有操作必须按照程序的顺序来执行。

所有线程都只能看到一个单一的操作执行顺序,在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。

对于未同步或未正确同步的多线程程序,JMM只提供最小安全性:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值。

JMM不保证未同步程序的执行结果与该程序在顺序一致性模型中的执行结果一致。

未同步程序在两个模型中的执行特征有如下差异:

顺序一致性模型保证单线程内的操作会按程序的顺序执行,而JMM不保证单线程内的操作会按程序的顺序执行。

顺序一致性模型保证所有线程只能看到一致的操作执行顺序,而JMM不保证所有线程能看到一致的操作执行顺序。

JMM不保证对64位的long/double型变量的写操作具有原子性,而顺序一致性模型保证对所有内存读/写操作都具有原子性。

第四章 Java并发编程基础

现代操作系统调度的最小单元是线程,也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等特性,并且能够访问共享的内存变量。

设置线程优先级时,针对频繁阻塞(休眠或者I/O操作)的线程需要设置较高优先级,而偏重计算(需要较多CPU时间或者偏运算)的线程则设置较低的优先级,确保处理器不会被独占。

线程在运行的生命周期中可能处于以下6种不同的状态:

New:初始状态,线程被创建,但是没有调用start()方法。

Runnable:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行中”。

Blocked:阻塞状态,表示线程阻塞于锁。

Waiting:等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其它线程做出一些指定动作(通知或中断)。

Time_Waiting:超时等待状态,可以在指定的时间自行返回。

Terminated:终止状态,表示当前线程已经执行完毕。

中断可以理解为线程的一个标识位属性,它标识一个运行中的线程是否被其它线程进行了中断操作。中断好比其他线程对该线程打了一个招呼,其他线程通过调用该线程的interrupt()方法对其进行中断操作。

线程通过检查自身是否被中断来进行响应,线程通过方法isInterrupt来进行判断是否被中断,也可以调用静态方法Thread.interrupt对当前线程的中断标识位进行复位,如果该线程已经处于终止状态,即使该线程被中断过,在调用该线程对象的isInterrupt时依旧返回false。

在抛出InterruptedException异常之前,Java虚拟机会先将该线程的中断标识位清除。

中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互方式,而这种交互方式最适合用来取消或停止任务,除了中断之外,还可以利用一个boolean变量来控制是否需要停止任务并终止该线程。

Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝,所以在程序的执行过程中,一个线程看到的变量并不一定是最新的。

volatile可以用来修饰字段,就是告知程序任何对该变量的访问需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。

synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。

任意线程对Object(Object由synchronized保护)的访问,首先要获得Object的监视器,如果获取失败,线程进入同步队列,线程状态变为Blocked,当访问Object的前驱(获得了锁的线程)释放了锁,则该释放操作唤醒阻塞在同步队列中的线程,使其重新尝试对监视器的获取。

等待/通知的相关方法:

notify():通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象上的锁。

notifyAll():通知所有等待在该对象上的锁。

wait():调用该方法的线程进入Waiting状态,只有等待另外线程的通知或被中断才会返回,调用wait()方法后,会释放对象的锁。

wait(long):超时等待一段时间,如果没有通知就返回。

wait(long, int):对于超时时间更精细粒度的控制,可以达到纳秒。

两个线程通过对象来完成交互,而对象上的wait和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

等待/通知的经典范式:

等待方

(1) 获取对象的锁。

(2) 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。

(3) 条件满足则执行对应的逻辑。

synchronized(对象) {

while(条件不满足) {

对象.wait();

}

对应的处理逻辑;

}

通知方

(1) 获得对象的锁

(2) 改变条件

(3) 通知所有等待在该对象上的线程。

synchronized(对象) {

改变条件;

对象.notifyAll();

}

管道输入/输出流用于线程之间的数据传输,而传输的媒介为内存,主要包括了以下4种实现:PipedOutputStream、PipeInputStream、PipedReader、PipedWriter,前两种面向字节,后两种面向字符。

如果一个线程A执行了Thread.join(),其含义是:当前线程A等待Thread线程终止后,才从Thread.join返回,线程Thread除了提供join()方法外,还提供了join(long millis)和join(long millis, int nanos)两个具备超时特性的方法,如果在给定的超时时间内没有终止,那么将会从超时方法中返回。

ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构,这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值,可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。

第五章 Java中的锁

5.1 Lock接口

锁是用来控制多个线程访问共享资源的方式,虽然它缺少了隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断地获取锁以及超时获取锁等多种synchronized关键字不具备的同步特性。

在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。

Lock接口提供的synchronized关键字不具备的主要特性

尝试非阻塞地获取锁:当前线程尝试获取锁,如果这一时刻没有被其它线程获取到,则成功获取并持有锁。

能被中断地获取锁:与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放。

在指定的截止时间之前获取锁:如果截止时间到了仍旧无法获取锁,则返回。

Lock的API

void lock():获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回。

void lockInterruptibly():可中断地获取锁,该方法会响应中断,即在锁的获取中可以中断当前线程。

boolean tryLock():尝试非阻塞地获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false。

boolean tryLock(long time, TimeUnit unit) throws InterruptedException:当前线程在超时时间内获得了锁;当前线程在超时时间内被中断;超时时间结束,返回false。

void unlock():释放锁。

Condition newCondition():获取等待/通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁。

5.2 队列同步器

5.2.1 队列同步器接口

队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其它同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注地领域。

同步器的设计是基于模板方法模式,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器的模板方法,而这些模板方法将会调用使用者重载的方法。

重写同步器指定的方法时,需要使用同步器提供的3个方法来访问或者修改同步状态:

getState():获取当前同步状态。

setState(int newState):设置当前同步状态。

compareAndSetState(int except, int update):使用CAS设置当前状态,该方法能够保证状态设置的原始性。

同步器提供的模板方法基本上分为以下3类:

独占式获取与释放同步状态

共享式获取与释放同步状态

查询同步队列中的等待线程情况。

5.2.2 队列同步器的实现分析

5.2.2.1 同步队列

同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造称为一个节点,并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步器中包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。

当一个线程成功地获取了同步状态,其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列当中,而这个加入到队列地过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法。

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。

5.2.2.2 独占式同步状态获取与释放

通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,即由于线程获取同步状态失败而进入同步队列后,后续对线程进行中断操作时,线程不会从同步队列中移除。

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

它的主要逻辑是:

(1)调用自定义同步器实现的tryAcquire方法,该方法保证线程安全的获取同步状态,这个方法需要队列同步器的实现者来重写。

(2)如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部。

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

//1.确保节点能够线程安全地被添加

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

//2.通过死循环来确保节点的正确添加,在"死循环"中只有通过`CAS`将节点设置为尾节点之后,当前线程才能从该方法返回,否则当前线程不断地进行尝试。

enq(node);

return node;

}

private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) { // Must initialize

if (compareAndSetHead(new Node()))

tail = head;

} else {

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

(3)最后调用acquireQueued(Node node, int arg)方法,使得该节点以死循环的方式获取同步状态。

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

//1.得到当前节点的前驱节点

final Node p = node.predecessor();

//2.如果当前节点的前驱节点是头节点,只有在这种情况下获取同步状态成功

if (p == head && tryAcquire(arg)) {

//3.将当前节点设为头节点

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

可以看到,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,这是由于:

头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。

维护同步队列的FIFO原则,通过简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)

当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件而言,代表着当前线程获取了锁。

通过调用同步器的release(int arg)方法可以释放同步状态,该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

(4)如果获取不到,则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

总结:

1.在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中进行自旋;

2.移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。

3.在释放同步状态时,同步器调用tryRelease(int arg)方法来释放同步状态,然后唤醒头节点的后继节点。

5.2.2.3 共享式同步状态获取与释放

共享式获取和独占式获取最主要的区别在于同一时刻能够有多个线程同时获取到同步状态。

通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态:

public final void acquireShared(int arg) {

if (tryAcquireShared(arg) < 0)

doAcquireShared(arg);

}

private void doAcquireShared(int arg) {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

if (interrupted)

selfInterrupt();

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

tryAcquireShared返回int类型,如果同步状态获取成功,那么返回值大于等于0,否则进入自旋状态;成功获取到同步状态并退出自旋状态的条件是当前节点的前驱节点为头节点,并且返回值大于等于0.

共享式获取,通过调用releaseShared(int arg)方法释放同步状态,tryReleaseShared必须要确保同步状态线程安全释放,一般是通过循环或CAS来保证的,因为释放同步状态的操作会同时来自多个线程。

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

5.2.2.4 独占式超时获取同步状态

通过调用同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态。

在此之前,一个线程如果获取不到锁而被阻塞在synchronized之外,对该线程进行中断操作,此时线程中断的标志位会被修改,但线程依旧会阻塞在synchronized上;如果通过acquireInterruptibly(int arg)方法获取,如果在等待过程中被中断,会立刻返回,并抛出InterruptedException异常。

private boolean doAcquireNanos(int arg, long nanosTimeout)

throws InterruptedException {

if (nanosTimeout <= 0L)

return false;

//1.计算出截止时间.

final long deadline = System.nanoTime() + nanosTimeout;

//2.加入节点

final Node node = addWaiter(Node.EXCLUSIVE);

boolean failed = true;

try {

for (;;) {

//3.取出前驱节点

final Node p = node.predecessor();

//4.如果获取成功则直接返回

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return true;

}

nanosTimeout = deadline - System.nanoTime();

//5.如果到了超时时间,则直接返回

if (nanosTimeout <= 0L)

return false;

if (shouldParkAfterFailedAcquire(p, node) &&

nanosTimeout > spinForTimeoutThreshold)

LockSupport.parkNanos(this, nanosTimeout);

//6.如果在自旋过程中被中断,那么抛出异常返回

if (Thread.interrupted())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

通过上面的代码可以知道,它和独占式获取的区别在于未获取到同步状态时的处理逻辑:独占式获取在获取不到是会一直自旋等待;而超时获取则会使当前线程等待nanosTimeout纳秒,如果当前线程在这个时间内没有获取到同步状态,将会从等待逻辑中自动返回。

5.2.2.5 自定义同步组件 - TwinsLock

TwinsLock只允许至多两个线程同时访问,超过两个线程的访问将会被阻塞。

public class TwinsLock implements Lock {

private final Sync sync = new Sync(2);

private static final class Sync extends AbstractQueuedSynchronizer {

Sync(int count) {

//初始值为2.

setState(count);

}

@Override

protected int tryAcquireShared(int arg) {

for(;;) {

//1.获得当前的状态.

int current = getState();

//2.newCount表示剩余可获取同步状态的线程数

int newCount = current - arg;

//3.如果小于0,那么返回获取同步状态失败;否则通过CAS确保设置的正确性.

if (newCount < 0 || compareAndSetState(current, newCount)) {

//4.当返回值大于等于0表示获取同步状态成功.

return newCount;

}

}

}

@Override

protected boolean tryReleaseShared(int arg) {

for (;;) {

int current = getState();

//将可获取同步状态的线程数加1.

int newCount = current + current;

if (compareAndSetState(current, newCount)) {

return true;

}

}

}

}

@Override

public void lock() {

sync.acquireShared(1);

}

@Override

public void unlock() {

sync.releaseShared(1);

}

@Override

public boolean tryLock() {

return false;

}

@Override

public boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {

return false;

}

@Override

public void lockInterruptibly() throws InterruptedException {

}

@NonNull

@Override

public Condition newCondition() {

return null;

}

}

测试用例:

public static void createTwinsLock() {

final Lock lock = new TwinsLock();

class TwinsLockThread extends Thread {

@Override

public void run() {

Log.d(TAG, "TwinsLockThread, run=" + Thread.currentThread().getName());

while (true) {

lock.lock();

try {

Thread.sleep(1000);

Log.d(TAG, "TwinsLockThread, name=" + Thread.currentThread().getName());

Thread.sleep(1000);

} catch (Exception e) {

e.printStackTrace();

} finally {

Log.d(TAG, "TwinsLockThread, unlock=" + Thread.currentThread().getName());

lock.unlock();

}

}

}

}

for (int i = 0; i < 10; i++) {

Thread thread = new TwinsLockThread();

thread.start();

}

}

5.3 重入锁

重入锁ReentrantLock表示该锁能够支持一个线程对资源的重复加锁。

如果在绝对时间上,先对锁获取的请求一定先被满足,那么这个锁是公平的,公平地获取锁,也就是等待时间最长的线程最优先地获取锁。

5.3.1 实现重进入

重进入需要解决两个问题:

线程再次获取锁,锁需要去识别获取锁地线程是否为当前占据锁的线程,如果是,则再次获取成功。

锁的最终释放,线程重复n次获取了锁,随后在第n次释放该锁后,其它线程能够获取到该锁。

5.3.2 公平与非公平锁的区别

公平与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,即FIFO。

公平锁的区别在于加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁;而对于非公平锁,只要CAS设置同步状态成功即可。

因此,公平锁每次都是从同步队列中的第一个节点获取到锁,而非公平锁出现了一个线程连续获取锁的情况。

非公平锁可能使线程饥饿,但其极少的线程切换,保证了更大的吞吐量。

5.4 读写锁

之前提到的锁都是排它锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性有很大提升。

并发包提供的读写锁的实现是ReentrantReadWrireLock,它支持公平性选择、重进入、锁降级(写锁能够降级为读锁)。

ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock和writeLock,而其实现ReentrantReadWriteLock:

getReadLockCount:返回当前读锁被获取的次数。

getReadHoldCount:返回当前线程获取读锁的次数。

isWriteLocked:判断写锁是否被获取。

getWriteHoldCount:返回当前线程获取写锁的次数。

下面是一个读写锁的简单用例:

public class ReadWriteCache {

static Map<String, Object> map = new HashMap<>();

static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

static Lock r = rwl.readLock();

static Lock w = rwl.writeLock();

public static Object get(String key) {

r.lock();

try {

return map.get(key);

} finally {

r.unlock();

}

}

public static Object put(String key, Object value) {

w.lock();

try {

return map.put(key, value);

} finally {

w.unlock();

}

}

public static void clear() {

w.lock();

try {

map.clear();

} finally {

w.unlock();

}

}

}

5.4.2 读写锁的实现分析

读写状态的设计

读写锁需要在同步状态(一个整形变量,高16表示读,低16表示写)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。

写锁的获取与释放

写锁是一个支持重进入的排它锁,如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取,则当前线程进入等待状态。

原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已经被获取的情况下对写锁的获取,那么正在运行的其它读线程就无法感知到当前写线程的操作。

读锁的获取与释放

读锁是一个支持重进入的共享锁,它能被多个线程同时获取,在没有其它写线程访问(或者写状态为0)时,读锁总是被成功地获取,而所做的也只是(线程安全)增加读状态。

锁降级

锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

5.6 Condition接口

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁,Condition是依赖Lock对象的。

当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal方法,通知当前线程后,当前线程才从await方法返回,并且在返回前已经获取了锁。

获取一个Condition必须通过Lock的newCondition方法,下面是一个有界队列的示例:

public class BoundedQueue<T> {

private Object[] items;

private int addIndex, removeIndex, count;

private Lock lock = new ReentrantLock();

private Condition notEmpty = lock.newCondition();

private Condition notFull = lock.newCondition();

public BoundedQueue(int size) {

items = new Object[size];

}

public void add(T t) throws InterruptedException {

lock.lock();

try {

while (count == items.length) { //如果当前队列内的个数等于最大长度,那么释放锁.

notFull.await();

}

if (++addIndex == items.length) { //如果已经到了尾部,那么从头开始.

addIndex = 0;

}

++count;

notEmpty.signal(); //通知阻塞在"空"条件上的线程.

} finally {

lock.unlock();

}

}

public T remove() throws InterruptedException {

lock.lock();

try {

while (count == 0) {

notEmpty.await(); //如果当前队列的个数等于0,那么释放锁.

}

Object x = items[removeIndex];

if (++removeIndex == items.length) {

removeIndex = 0;

}

--count;

notFull.signal(); //通知阻塞在"满"条件上的线程.

return (T) x;

} finally {

lock.unlock();

}

}

}

Condition的方法:

await():当前线程进入等待状态直到被通知signal或中断,当前线程进入运行状态且从await返回的情况:

其他线程调用该Condition的signal或signalAll方法。

其它线程中断当前线程(interrupt)。

如果当前等待线程从await方法返回,那么表明当前线程已经获取了Condition对象所对应的锁。

awaitUninerruptibly:对中断不敏感

long await Nanos(long):加入了超时的判断,返回值是(nanosTimeout - 实际耗时),如果返回值是0或者负数,那么可以认定为超时。

boolean awaitUntil(Data):直到某个固定时间。

signal:唤醒一个等待在Condition上的线程。

signalAll:唤醒所有等待在Condition上的线程。

5.6.2 Condition的实现

ConditionObject是AbstractQueuedSynchronizer的内部类,每个Condition对象都包含着一个队列。

1.等待队列

在队列中的每个节点都包含了一个线程的引用,该线程就是在Condition对象上等待的线程,同步队列和等待队列中节点的类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。

由于Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。

当调用await方法时,将会以当前线程构造节点,并将节点从尾部加入到等待队列,也就是将同步队列移动到Condition队列当中。

2.等待

调用该方法的前提是当前线程必须获取了锁,也就是同步队列中的首节点,它不是直接加入到等待队列当中,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入到等待队列当中。

3.通知

调用该方法的前提是当前线程必须获取了锁,接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。

被唤醒的线程,将从await方法中的while中返回,进而调用同步器的acquireQueued方法加入到获取同步状态的竞争中。

Condition的signalAll方法,相当于对等待队列中的每个节点均执行一次signal方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点。

六、Java并发容器和框架

6.1 ConcurrentHashMap

ConcurrentHashMap是线程安全并且高效的HashMap,其它的类似容器有以下缺点:

HashMap在并发执行put操作时,会导致Entry链表形成环形数据结构,就会产生死循环获取Entry。

HashTable使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。

ConcurrentHashMap高效的原因在于它采用锁分段技术,首先将数据分成一段一段地存储,然后给每段数据配一把锁,当一个线程占用锁并且访问一段数据的时候,其他段的数据也能被其他线程访问。

6.1.2 ConcurrentHashMap的结构

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成:

Segment是一种可重入锁,在ConcurrentHashMap里面扮演锁的角色;

HashEntry则用于存储键值对数据。

一个ConcurrentHashMap里包含一个Segment数组,它的结构和HashMap类似,是一种数组和链表结构。

一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。

6.1.5 ConcurrentHashMap的操作

get

get的高效在于整个get过程中不需要加锁,除非读到的值是空才会加锁重读。原因是它的get方法将要使用的共享变量都设为volatile,能够在线程间保持可见性,能够被多线程同时读,并且不会读到过期的值,例如用于统计当前Segment大小的count字段和用于存储值的HashEntry的value。

put

put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量之前必须加锁,put首先定位到Segment,然后在Segment里进行插入操作。

size

先尝试2次通过不锁住Segment的方式来统计各个Segment的大小,如果统计的过程中,容器的count发生了变化,则再用加锁的方式来统计所有Segment的大小。

6.2 ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,它采用CAS算法来实现。

6.2.1 入队列

入队主要做两件事情:

将入队节点设置成当前队列尾节点的下一个节点。

更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点;如果tail节点的next节点为空,则将入队节点设置成tail的next节点。

在多线程情况下,如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时第一个线程要暂停入队操作,然后重新获取尾节点。

整个入队操作主要做两件事:

定位出尾节点。

使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。

6.3 阻塞队列

6.3.1 阻塞队列

阻塞队列是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法:

当队列满时,队列会阻塞插入元素的线程,直到队列不满。

当队列空时,获取元素的线程会等待队列为空。

在阻塞队列不可用时,附加操作提供了4种处理方式:抛出异常、返回特殊值、一直阻塞、超时退出。每种方式通过调用不同的方法来实现。

Java里面提供了7种阻塞队列。

6.4 Fork/Join框架

用于并行执行任务的框架,是把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大人物结果的框架。

Fork/Join使用两个类来完成事情:

ForkJoinTask:它提供了fork()和join()操作的机制,通常情况下,我们继承它的子类:有返回结果的RecursiveTask和没有返回结果的RecursiveAction。

ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来添加。

ForkJoinTask在执行的时候可能会抛出异常,但是我们没有办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经取消了。

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

七、Java中的13个原子操作类

Atomic包里提供了:原子更新基本类型、原子更新数组、原子更新引用和原子更新属性。

7.1 原子更新基本类型:

AtomicBoolean

AtomicInteger

AtomicLong

基本方法:

int addAndGet(int delta):以原子方式将输入的值与当前的值相加,并返回结果。

boolean compareAndSet(int expect, int update):如果当前的数值等于预期值,则以原子方式将该值设置为输入的值。

int getAndIncrement():以原子方式加1,并返回自增前的值。

void lazySet(int newValue):最终会设置成newValue,可能会导致其他线程在之后的一小段时间内还是读到旧值。

int getAndSet(int newValue):以原子方式设置为newValue的值,并返回旧值。

7.2 原子更新引用类型

AtomicIntegerArray

AtomicLongArray

AtomicReferenceArray

基本方法:

int addAndGet(int i, int delta):以原子方式将输入值和索引i的元素相加。

boolean compareAndSet(int i, int expect, int update):如果当前值等于预期值,则以原子方式将数组位置i的元素设置成update值。

7.3 原子更新引用类型

用于原子更新多个变量,提供了3种类型:

AtomicReference:原子更新引用类型。

AtomicReferenceFieldUpdater:原子更新引用类型里的字段。

AtomicMarkableReference:原子更新带有标记位的引用类型。

7.4 原子更新字段类

AtomicIntegerFieldUpdater:原子更新整形的字段的更新器。

AtomicLongFieldUpdater:原子更新长整形字段的更新器。

AtomicStampedReference:原子更新带有版本号的引用类型。

原子地更新字段需要两步:

因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater创建一个更新器,并且需要设置想要更新的类和属性。

更新类的字段必须使用public volatile来修饰。

八、Java中的并发工具类

九、Java中的线程池

线程池的优点:降低资源消耗,提高响应速度,提高线程的可管理性。

9.1 线程池的实现原理

线程池的处理流程如下:

判断核心线程池是否已满,如果不是,则创建一个新的工作线程来执行任务;如果已满,则进入下个流程。

判断工作队列是否已满,如果不是,则将提交的任务存储在工作队列里;如果已满,则进入下个流程。

判断线程池的线程是否都处于工作状态,如果没有,则创建一个新的工作线程;如果已满,则交给饱和策略来处理。

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task. The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn't, by returning false.

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

*

* 3. If we cannot queue task, then we try to add a new

* thread. If it fails, we know we are shut down or saturated

* and so reject the task.

*/

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) { //1.添加进入核心线程.

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) { //2.添加进入队列.

int recheck = ctl.get();

if (!isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false)) //3.添加进入非核心线程.

reject(command);

}

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

在以上的三步中,除了加入队列不用获取全局锁以外,其它两种情况都需要获取,为了尽可能地避免获取全局锁,在ThreadPoolExecutor完成预热之后(当前运行的线程数大于corePoolSize),几乎所有的execute方法调用都是加入到队列当中。

9.2 线程池的使用

9.2.1 线程池的创建

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其它空闲的基本线程能够执行新任务也会创建。

runnableTaskQueue:用于保存等待执行的任务的阻塞队列,可以选择:

ArrayBlockingQueue:基于数组结构的有界阻塞队列。

LinkedBlockingQueue:基于链表结构的阻塞队列,吞吐量高于前者。

SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等待另一个线程调用了移除操作,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

maxPoolSize:允许创建的最大线程数。

ThreadFactory:用于设置创建线程的工厂。

RejectExecutionHandler:饱和策略。

keepAliveTime:线程池的工作线程空闲后,保持存活的时间。

TimeUnit:线程保持活动的单位。

9.2.2 向线程池提交任务

execute(Runnable runnable):提交不需要返回值的任务。

Future<Object> future = executor.submit(haveReturnValuetask):用于提交需要返回值的任务,线程池会返回一个future类型任务,可以用它来判断任务是否执行成功,并且可以通过get方法来获取返回值,get方法会阻塞当前线程直到任务完成。

9.2.3 关闭线程池

shutdownNow:首先将线程池的状态设为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

shutdown:将线程池的状态置为SHUTDOWN,然后中断所有没有正在执行任务的线程。

十、Executor框架

(1)在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。

(2)在HotSpot VM的线程模型中,Java线程再被一对一映射为本地操作系统线程,Java线程启动时会创建一个本地操作系统线程,当该线程终止时,这个操作系统线程也会被回收。

(3)操作系统会调度所有线程并将它们分配给可用的CPU。

Executor框架

由三个部分组成:

任务,即Runnable接口或Callable接口。

任务的执行,包括核心接口Executor,以及继承自Executor的ExecutorService,还有它的两个关键类ThreadPoolExecutor(用来执行任务)和ScheduledThreadPoolExecutor(可以在给定的延迟后运行命令,或者定期执行命令)。

异步计算的结果,包括接口Future和实现类FutureTask。

10.2 ThreadPoolExecutor详解

通过工具类Executors,可以创建以下三种类型的ThreadPoolExecutor,调用静态创建方法之后,会返回ExecutorService

FixedThreadPool

可重用固定线程数的线程池;如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务;如果等于corePoolSize,将任务加入到无界队列LinkedBlockingQueue当中;多余的空闲线程将会被立即终止。

SingleThreadPool

单个woker线程的executor;corePoolSize和maximumPoolSize为1;采用无界队列作为工作队列。

CacheThreadPool

采用没有容量的SynchronousQueue作为线程池的工作队列,其corePoolSize为0,maximumPool是无界的;其中的空闲线程最多等待60s。

如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CacheThreadPool会不断创建新线程,极端情况下,CacheThreadPool会因为创建过多线程而耗尽CPU资源。

10.3 ScheduledThreadPoolExecutor详解

用来在给定的延迟之后执行任务,或者定期执行任务,并且可以在指定的构造函数中指定多个对应的后台线程数。

它采用DelayQueue这个无界队列作为工作队列,其执行分为两个部分:

当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()或者scheduleWithFIxedDelay,它会向DelayQueue中添加ScheduledFutureTask。

线程池中的线程从DelayQueue中获取ScheduledFutureTask。

总结

到这里,残酷月光多线程并发编程的总结和梳理就结束了,,不足之处还望大家多多包涵!!觉得收获的话可以点个关注收藏转发一波喔,谢谢大佬们支持。(吹一波,233~~)

下面和大家交流几点编程的经验:

1、多写多敲代码,好的代码与扎实的基础知识一定是实践出来的

2丶 测试、测试再测试,如果你不彻底测试自己的代码,那恐怕你开发的就不只是代码,可能还会声名狼藉。

3丶 简化编程,加快速度,代码风骚,在你完成编码后,应回头并且优化它。从长远来看,这里或那里一些的改进,会让后来的支持人员更加轻松。

最后,每一位读到这里的网友,感谢你们能耐心地看完。希望在成为一名更优秀的Java程序员的道路上,我们可以一起学习、一起进步。

内部交流群569068099 欢迎各位前来交流和分享, 验证:(007)

Java小毛驴,头条出品,每天一篇干货,喜欢就收藏+关注

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表