多线程并发拓展
死锁问题如何解决
什么是死锁
一组相互竞争资源的进程因为相互等待导致永久阻塞的现象成为死锁。
转账死锁案例
@Data
@AllArgsConstructor
public class Account {
String name;
int balance;
// 转出
public void decDebit(int amount) {
this.balance -= amount;
}
// 转入
public void incDebit(int amount) {
this.balance += amount;
}
}
class TransformAccount implements Runnable {
private final Account fromAccount; // 转出账户
private final Account toAccount; // 转入账户
private final int amount; // 交易金额
public TransformAccount(Account fromAccount, Account toAccount, int amount) {
this.fromAccount = fromAccount;
this.toAccount = toAccount;
this.amount = amount;
}
@Override
public void run() {
while (true) {
synchronized (fromAccount) { // fromAccount 加锁
synchronized (toAccount) { // toAccount 加锁
// 锁定两个账户保证账户不能被修改防止数据不安全
if (amount <= fromAccount.getBalance()) {
fromAccount.decDebit(amount);
toAccount.incDebit(amount);
}
System.out.println("转账人 " + fromAccount.getName() + " 余额: " + fromAccount.getBalance() + " 元");
System.out.println("收款人 " + toAccount.getName() + " 余额 " + toAccount.getBalance() + " 元");
}
}
}
}
}
class App {
public static void main(String[] args) {
Account fromAccount = new Account("张三", 100000);
Account toAccount = new Account("李四", 200000);
Thread a = new Thread(new TransformAccount(fromAccount, toAccount, 1));
Thread b = new Thread(new TransformAccount(toAccount, fromAccount, 2));
a.start(); // 执行 A 线程
b.start(); // 执行 B 线程
}
}
执行结果会在执行一段时间后就发生卡死现象,此时就是发生了死锁问题。发生死锁问题的原因在于线程 A 和线程 B 互相锁定了对方的 fromAccount 或 toAccount 导致两者需要获取锁的对象都被对方占用了,进而发生了死锁问题。
死锁发生的条件
- 互斥,共享资源 X 和 Y 只能被一个线程占用
- 占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X
- 不可抢占,其他线程不能强行抢占线程 T1 占有的资源
- 循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待
如何解决死锁
一旦发生虽说,一般没有什么好的办法来解决,只能通过重启应用。所以如果要解决死锁问题,最好的方案就是在设计程序时提前规避。提前规避的最好的方式就是破坏除了第一个条件之外的另外三个条件之一。
破坏占有且等待
通过共享统一锁实例(第三方管理对象)来破坏占有且等待条件中互相不释放共享资源的情况。
@Data
@AllArgsConstructor
public class Account { ... }
@Data
@AllArgsConstructor
class TransformAccount implements Runnable {
/* default variable TODO */
private final Allocator allocator; // 第三方的资源管理者
@Override
public void run() {
while (true) {
// 在这里获取临界资源,单一实例的锁
if (allocator.acquire(fromAccount, toAccount)) {
try {
synchronized (fromAccount) {
synchronized (toAccount) {
/* Transform balance TODO */
}
}
} finally {
// 释放临界资源
allocator.release(fromAccount, toAccount);
}
}
}
}
}
class App { /* Thread TODO */ }
class Allocator {
// 在这里获得资源和释放资源
List<Object> list = new ArrayList<>();
synchronized boolean acquire(Object from, Object to) {
// 同步申请资源
if (list.contains(from) || list.contains(to)) {
return false;
}
list.add(from);
list.add(to);
return true;
}
synchronized boolean release(Object from, Object to) {
list.remove(from);
list.remove(to);
return true;
}
}
破坏不可抢占
要在上述案例中破坏不可抢占性就需要突破 synchronized 的限制,因为 synchronized 要执行势必要获得同步锁从根本上无法解决问题。所以需要在 Allocator 中构造一个重入锁,再在循环线程中尝试获得这个重入锁(判断 reentrantLock.tryLock() 方法返回的 bool 值)后再执行后续操作来实现对不可抢占条件的破坏,这样就避开了 synchronized 的局限性。
破坏循环等待
要破坏循环等待只需要确定获得锁的顺序,也就是做一个排序来比较锁的顺序(hashCode)。
@Override
public run() {
Account left = null;
Account right = null;
if (fromAccount.hashCode() > toAccount.hashCode()) {
left = toAccount;
right = fromAccount;
}
/* ... */
synchronized (left) {
synchronized (right) {
/* TODO */
}
}
}
单机版MapReduce之fork-join
基本概念
Fork/Join 框架是 Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
工作原理
核心点就是分割任务到多线程进行并行处理得到最后的结果。
工作窃取

将每个任务放到不同的队列中进行任务处理,线程从这个队列的头部或尾部都可以获得数据,如果一个线程先于其他线程完成了自己的任务(线程1)则可以从其他线程的尾部窃取任务来继续处理(窃取线程2)即线程1的处理效率比线程2高,能者多劳。
为了减少线程竞争正常线程都从头部向下处理,而工作窃取线程则会从尾部向上处理。工作窃取算法使得线程资源(线程并行)被充分利用从而提升性能缩短任务时间。
缺点:当如果所有队列中都只有一个任务,线程之间还是会去发生工作窃取消耗更多的内存资源。
数字累加案例
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class ForkJoinDemo extends RecursiveTask<Integer> {
// 计算 1 到 10 的累加
// 分割阈值:每个任务的大小 2 最多允许执行两个数的相加
final int THRESHOLD = 2;
int START, END;
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (END - START) <= THRESHOLD;
if (canCompute) { // 小于阈值不能再进行分割直接计算
// 在最小区间内进行累加
for (int i = START; i <= END; i++) {
sum += i;
}
System.out.println("START: " + START + " END: " + END + " SUM: " + sum);
} else { // 任务还可以继续分割
int middle = (START + END) / 2;
// 类似于二叉树的数据结构分配给左任务和右任务
ForkJoinDemo left = new ForkJoinDemo(START, middle);
ForkJoinDemo right = new ForkJoinDemo(middle + 1, END);
// RecursiveTask.fork() 最终还是委派给 compute() 方法
left.fork();
right.fork();
int leftJoin = left.join();
int rightJoin = right.join();
sum = leftJoin + rightJoin;
// 与递归不同的是任务交由线程处理
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinDemo demo = new ForkJoinDemo(1, 10);
Future<Integer> result = pool.submit(demo);
System.out.println(result.get());
}
}
ConcurrentHashMap原理
数据结构
初始化
-
初始化一个长度为 16 的数组
-
put("name", "Dioxide_CN")
通过计算 Hash 值来获取下标
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { /* MORE CODE */ for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; K fk; V fv; if (tab == null || (n = tab.length) == 0) tab = initTable(); // ((n - 1) & hash) 通过该算式得到一个位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 通过 CAS 和构造 Node 进行原子性地添加数据 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; } /* MORE CODE */ } addCount(1L, binCount); return null; }
-
put("txt", "txt")
发生哈希碰撞时已链表的形式插入形成单向链表
扩容过程
当以下三种情况中满足两种则会发生扩容:
- 当新增节点之后,所在的链表元素个数打到了阈值 8,并且数组长度小于阈值 64 的时候
- 在调用
addCount()
方法新增集合元素计数后发现当前集合元素个数到达扩容阈值时就会触发扩容 - 当线程处于扩容状态下,其他线程对集合进行数据操作时会参与帮助扩容

数据迁移原理
N{A1JQE42`I]8.png)
通过公式 (f = tabAt(tab, i = (n-1) & hash))==null
计算索引:
- 假设1:table 长度为 16,如果某个 key 的 hash = 9,扩容到 32,上述公式得到的值不变
- 假设2:某个 key 的 hash = 20,在 16 位长度和 32 位长度下分别是 4 和 20,因为 4 位置的数据需要迁移到 20 位置上
也就是说,table 长度发生变化之后,获取同样一个 key 在 table 数组中的位置发生了变化,而 hash 值相同的情况下在发生扩容后还是会发生结果不同的情况进而发生部分数据迁移。
数组过长带来的红黑树转换

当数组长度大于等于 64 且某个数组位置的链表长度大于等于 8 时,会把链表转换成红黑树的数据结构。
评论区