目录
一、共享变量(与ThreadLocal无关可以跳过)
不用ThreadLocal操作变量,并且不加同步锁的情况下(线程不安全!!!)
下面这段代码就是给 value 操作加上锁了。(这里只保证了线程安全,性能不是很好)
二、ThreadLocal共享变量
1、举个栗子
2、原理介绍
(1) 数据结构原理
(2)弱引用原理与内存泄漏
3、源码分析
(1)大体数据结构
(2)创建
(3)set+hash冲突解决方法
(4) 整理卡槽
(5)扩容
(6)remove
(7)hashcode(散列函数)// todo散列函数原理,写一篇
(8)get
ThreadLocal出现就是为了保证变量在线程中被操作不会被其他线程篡改,并且能在线程中共享。
这章节主要介绍变量共享出现的问题,以及如何实现在线程安全的共享变量。
package com.test;
/**
* Created by Ljy on 2018/3/29.
*/
public class Test {
private static int value = 0;
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try{
for (int i=5; i>0; i--){
Thread.sleep(10);
System.out.println("thread1:"+value);
value--;
}
}catch (Exception e){
e.printStackTrace();
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try{
for (int i=5; i>0; i--){
Thread.sleep(10);
System.out.println("thread2:"+value);
value--;
}
}catch (Exception e){
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}
运行结果(结果不一定)
thread2:0
thread1:0
thread2:-2
thread1:-3
thread2:-4
thread1:-5
thread2:-6
thread1:-7
thread2:-7
thread1:-9
简单的解释就是当 thread2 打印 System.out.println("thread1:"+value); 的同时, thread1也打印了 System.out.println("thread1:"+value); ,此时两线程持有的 value 都为 0 。所以就会出现结果这种问题。
从 JVM 内存模型上解释就是,thread1 read load的时候,thread2 也 read load 得出 value 是0,然后两线程操作完 value-- 操作后,store write 回主内存。解决也很简单,在一个线程中操作 value 的时候,给 value 加上 lock,就能解决这种线程不安全的问题。
package com.test;
/**
* Created by Ljy on 2018/3/29.
*/
public class Test {
private static int value = 0;
public static synchronized void t1(){
try{
for (int i=5; i>0; i--){
Thread.sleep(10);
System.out.println("thread1:"+value);
value--;
}
}catch (Exception e){
e.printStackTrace();
}
}
public static synchronized void t2(){
try{
for (int i=5; i>0; i--){
Thread.sleep(10);
System.out.println("thread2:"+value);
value--;
}
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
t1();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
t2();
}
});
t1.start();
t2.start();
}
}
这章节主要介绍ThreadLocal是如何做到在线程中共享变量,但是线程和线程直接是相互隔离的。
package com.freedom.show.controller.api;
/**
* Created by Ljy on 2019/5/16.
*/
public class Test {
private static ThreadLocal<Integer> th = new ThreadLocal<>();
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
th.set(10);
for (int i=5; i>0; i--){
Integer integer = Integer.parseInt(th.get()+"");
System.out.println("thread1"+th+":"+integer);
th.set(--integer);
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
th.set(10);
for (int i=5; i>0; i--){
Integer integer = Integer.parseInt(th.get()+"");
System.out.println("thread2"+th+":"+integer);
th.set(--integer);
}
}
});
t1.start();
t2.start();
}
}
输出结果为
thread1java.lang.ThreadLocal@2a49099d:10
thread1java.lang.ThreadLocal@2a49099d:9
thread1java.lang.ThreadLocal@2a49099d:8
thread1java.lang.ThreadLocal@2a49099d:7
thread1java.lang.ThreadLocal@2a49099d:6
thread2java.lang.ThreadLocal@2a49099d:10
thread2java.lang.ThreadLocal@2a49099d:9
thread2java.lang.ThreadLocal@2a49099d:8
thread2java.lang.ThreadLocal@2a49099d:7
thread2java.lang.ThreadLocal@2a49099d:6
上面输出的结果可以看到两个线程用的ThreadLocal对象直线改的是同一个地址,也就是两线程操作的是同一个ThreadLocal,那为什么两线程能保证两线程能相互隔离呢?带着这个问题,我们看下面的原理介绍。
下图的实现表示强引用,虚线表示弱引用。
从上图上能看出,栈中当前线程的引用指向堆中当前线程实例。当前线程实例是有一个 Map 对象,这个Map对象里面的 key 是 ThreadLocal 对象,value 是我们想共享的值。没了解JVM,堆栈不了解也没事。简单来说,就是每个线程都有一个线程对象,每个线程对象里面持有一个 Map 对象,该 Map 对象的 key 就是 ThreadLocal 实例。
上面代码共用一个 ThreadLocal 对象,但是为什么还能线程安全呢?这就全靠 Map(ThreadLocalMap,是ThreadLocal的内部类)。因为每个线程都持有一个 CurrentThread 以及 ThreadLocalMap。就算 key 相等,但是不同线程操作的ThreadLocalMap 是不同的,这就能保证当前线程只操作他自己的ThreadLocalMap。ThreadLocalMap 的操作就像下面这段代码,map1 和 map2 是相对独立的。
Map map1 = new HashMap();
Map map2 = new HashMap();
ThreadLocal th = new ThreadLocal();
map1.put(th, 1);
map2.put(th, 2);
上图看不懂没事,等看完源码来看着图就豁然开朗了。
1)弱引用原理
弱引用的专业解释是指不能确保其引用的对象不会被垃圾回收器回收的引用。一个对象若只被弱引用所引用,则被认为是不可访问(或弱可访问)的,并因此可能在任何时刻被回收。
ThreadLocalMap里面的 Entry 继承了 WeakReference 类。如果 ThreadLocal 和 ThreadLocalMap 中的 key 是强引用,当 ThreadLocal 没有除它和 key 之间引用外的任何强引用时,我们明明不再需要 ThreadLocal 实例,但是 ThreadLocal 实例确一直保存在线程中,线程一直不被销毁,ThreadLocal 一直被创建,就会出现内存泄漏,这是我们不想看到的。所以采用虚引用,就能很好的在 ThreadLocal 没有其他强引用的情况下,被GC回收。
2)内存泄漏
什么是内存泄漏?
内存泄漏(Memory Leak)是指程序中己动态分配的堆内存由于某种原因程序未释放或无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果。
为什么要用弱引用,当线程回收的时候就会被回收呀?
弱引用的引入就是为了防止内存泄漏,介绍弱引用原理的时候也介绍了线程一直不回收的极端情况,但是没有详细解释,在这里解释一下。
内存泄漏存在于
1、在线程池中,线程之后被回收到池,并没有销毁,如果线程中存在创建 ThreadLocal 对象,如果 ThreadLocal 和 ThreadLocalMap 的 key 之间使用的是强引用。 那么 ThreadLocal 就不会被回收,当程序长时间运行后,内存中的ThreadLocalMap 中的 Entry[] table 就会被不断增大,导致程序运行崩溃,内存泄漏。这种问题,ThreadLocal 已经帮我们规避了。
2、定义了一个 static ThreadLocal 对象,初始化完没有再使用,这种也属于内存浪费,内存泄漏。这类问题,就看开发人员是否遵循了代码书写标准。
可能上面的原理,很抽象,现在从源码的角度介绍。
Thread 中持有 ThreadLocalMap 对象。
ThreadLocalMap 里面使用 Entry[] table 进行存储,其中 Entry 中的 key 是 ThreadLocal 对象。
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
private static final int INITIAL_CAPACITY = 16;// 初始化容量
private Entry[] table; // 和HashMap 一样使用的是数组进行存储
private int size = 0;//table 中 entry 的数量
private int threshold; // 初始化0,默认是 2/3 容量
......
}
ThreadLocal 的创建如下,就是简单的调用默认的构造函数。并没有进行初始化
ThreadLocal th = new ThreadLocal();
ThreadLocal 默认构造函数
public ThreadLocal() {
}
ThreadLocal 的 set 操作:
th.set(1);
set 操作源码:
public void set(T value) {
Thread t = Thread.currentThread(); // 获取当前线程,直接调用的是 native 方法
ThreadLocalMap map = getMap(t); //获取当前ThreadLocalMap
if (map != null)
map.set(this, value);
else
createMap(t, value);// 创建一个 ThreadLocalMap 对象实例,
}
set 主要操作就是
1、获取当前线程
2、获取线程中持有的 ThreadLocalMap 对象。
3、(1)如果 map 存在,直接调用 ThreadLocalMap 的 set 方法进行 数据的存储
(2)如果 map 不存在,则创建Map(这里的创建Map其实包括了 key value 的存储)。
1) getMap()
只是简单的获取了当前线程持有的 ThreadLocalMap 对象
ThreadLocalMap getMap(Thread t) {
// 获取当前线程的threadLocalMap实例
return t.threadLocals;
}
2)ThreadLocal.set()
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
//获取直接散列槽位置
int i = key.threadLocalHashCode & (len-1);
//往后找
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//如果找到了,就覆盖返回
if (k == key) {
e.value = value;
return;
}
//ThreadLocal为空时,去往后找有没有合适的,没有就采取策略
if (k == null) {// 这里就防止内存泄漏了
//清理卡槽,将entry替换key相同的entry,或者放到下一个空卡槽位置
replaceStaleEntry(key, value, i);
return;
}
}
// i位置没有被占用
tab[i] = new Entry(key, value);
int sz = ++size;
//如果table不能清除卡槽了,并且当前大小大于阈值,删除过时的条目。重新包装和/或重新调整表格的大小。
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
ThreadLocal.set 主要操作流程:
a) 根据hash值获取直接散列槽
b)从直接散列槽的位置往后找
1)如果找到 key 相同的,就替换。
2)如果1不成立并且找到 key 为 null 的,就清理卡槽,并采取策略放置
3)如果1、2不成立并且当前卡槽为 null, 这将 Entry 放在空卡槽中,检测是否超过阈值。超过阈值就扩容。
3)createMap(key, value)
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 默认初始容量16
table = new Entry[INITIAL_CAPACITY];
// 获取 key 对应的 index
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 创建 entry 保存在指定 index 中
table[i] = new Entry(firstKey, firstValue);
// 刷新 table 中 entry size 大小
size = 1;
// 设置阈值,阈值为容量的2/3
setThreshold(INITIAL_CAPACITY);
}
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
清理卡槽,并将 key value 放入指定位置
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
//备份以检查当前运行中的先前旧卡槽。
//我们一次清理整个运行以避免由于垃圾收集器释放串中的refs(即收集器运行时)不断的增量重复。
//slotToExpunge:要清除的卡槽
int slotToExpunge = staleSlot;
//往前找,找到hashc冲突的最前面一个key为空的entry
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
//找到run的key或trailing null slot,以先发生者为准
//往后看,
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//如果我们找到键,那么我们需要将它与传入的卡槽位置交换以维护哈希表顺序。
//传入的卡槽位置或任何其他陈旧的插槽遇到它上面,然后可以发送到expungeStaleEntry删除或重新运行运行中的所有其他条目。
//找到当前ThreadLocal对象相同的entry
if (k == key) {
//找到相同的覆盖value
e.value = value;
//将key为空的槽和当前槽对调
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 如果传入的卡槽到它前面空的卡槽直接没有其他key为空的卡槽时,因为上面已经改变了旧卡槽对象的位置了,更新旧卡槽位置
/********************** 操作1 ********************/
if (slotToExpunge == staleSlot)
slotToExpunge = i;
//整理并清理一些槽
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
//如果我们在向后扫描时没有找到stale entry,则扫描key时看到的第一个stale entry是第一个仍在运行中出现的条目。
// 这里保证了 1 操作的正确性
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
//如果没有找到相同的key,就把他放在旧卡槽中
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
//清理卡槽
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
了解整理卡槽之前,我们先简单了解一下卡槽的构造。由于阈值为容量的2/3,并且实用的hash算法采用了离散原理,保证了两个空卡槽之间的非空卡槽中保存的,都是hash冲突的数据。
基于上面的前提,我们来看整理卡槽的基本操作:
1) 在当前位置往前找,在hash冲突这段卡槽中,找到第一个 key 为空的 entry,并记录需要清理的位置。
2) 在当面位置往后找,找到 key 相同的卡槽,就将覆盖key相同的卡槽,并将该卡槽和第一步找出的 key 为空的卡槽(需要清理的卡槽)对调。此时如果需要清理的位置和传入位置相同时(表示传入位置之前没有 key 为空的卡槽),需要清理的卡槽就变成了,当前对调的卡槽位置,并整理卡槽。
这里就有一个疑问,如何保证了需要清理的卡槽和当前卡槽之间没有 key 为空的卡槽呢?if (k == null && slotToExpunge == staleSlot)slotToExpunge = i; 这段代码,保证了在slotToExpunge == staleSlot情况下,需要清理的卡槽和当前卡槽中不存在 key 为空的卡槽。
3) 如果在第二步中,没有找到 key 相同的 entry,则直接在 hash 冲突的段后加一个entry,并整理卡槽。
清理 key 为空的卡槽
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
//删除staleSlot位置上的entry
tab[staleSlot].value = null;
tab[staleSlot] = null;
// 刷新 table 中 entry 的数量
size--;
// 接下来就是操作当前位置到下一个空位置直接冲突的元素
//重新遍历直到我们遇到null
Entry e;
int i;
//往后遍历
for (i = nextIndex(staleSlot, len);
// 获取下一个卡槽,如果下一个卡槽为空则退出循环
(e = tab[i]) != null;
i = nextIndex(i, len)) {
// 获取当前 entry 的 key 值
ThreadLocal<?> k = e.get();
//如果碰到 key 为空的,则清空
if (k == null) {
e.value = null;
tab[i] = null;
// 刷新 table 中 entry size
size--;
} else {
//获取位置并整理
int h = k.threadLocalHashCode & (len - 1);
//后来 hash 得出的位置和当前位置不同的时候
//也就是这个key在加入的时候,和别的key冲突了
if (h != i) {
//当前清空
tab[i] = null;
//如果需要的h位置不为空,则找下一个
while (tab[h] != null)
h = nextIndex(h, len);
//将h位置赋值
tab[h] = e;
}
}
}
return i;
}
整理 key 为空卡槽操作为:
1) 将当前需要清除的卡槽 value 和 entry 都赋值成 null。
2) 从当前卡槽往后遍历,如果找到 key 为空的卡槽,则将 value 和 entry 都赋值成 null。如果找到的 key 不为空,根据 key 的 hash 值,计算出该 key 的位置,如果计算出的位置和当前位置不同,则代表了这个 key 是因为 hash 冲突,才被排到这里的。从 hash 一致的第一个元素开始往后遍历,找到第一个 entry 为空的位置,就放入。
上面操作就保证了 hash 一致的一段,key 值都不为空,并且所有元素都往头上靠。保证了数据的整齐。
整理卡槽
/**尝试扫描一些寻找陈旧条目的单元格。 添加新元素或删除另一个旧元素时会调用此方法。 它执行对数扫描,作为无扫描(快速但保留垃圾)和与元素数量成比例的多次扫描之间的平衡,这将找到所有垃圾,但会导致一些插入花费O(n)时间。**/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
//获取下一个
i = nextIndex(i, len);
Entry e = tab[i];
//如果key为空
if (e != null && e.get() == null) {
n = len;
removed = true;
//继续删除过时的条目
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
扩容预操作
private void rehash() {
// 整理整表数据
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
//使用较低的阈值加倍以避免滞后
if (size >= threshold - threshold / 4)
//两倍扩容
resize();
}
扩容之前,先会整理整表数据,将 key 为空的数据删除,并将 hash 冲突的数据连续排列。只有当整理完之后的 size 还大于等于 3/4 时,才采取两倍扩容。
两倍扩容
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
扩容操作很简单,就是把旧 table 里面的数据一个个放入新table,然后把新 table 覆盖旧table。
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
// 从 key 对应的 table 上的位置,开始往后找
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
// 如果找到指定 key 对应的 entry,
if (e.get() == key) {
// 清空 entry 对应的引用(key = null),帮助Gc
e.clear();
// 清楚当前卡槽,并整理当前卡槽到下一个空卡槽直接的元素
expungeStaleEntry(i);
return;
}
}
}
删除操作流程:
1)从于当前要删除的 key hash 一致的第一个元素开始,往后找。
2)找到 key 和当前要删除的 key 相同的 entry 时,清空 entry 对应的引用,即将 entry 中的 key 赋值成 null,帮助GC。并整理卡槽。
private static AtomicInteger = new AtomicInteger();
// 连续生成的哈希码之间的差异 - 将隐式顺序线程局部ID转换为近似最优扩展的乘法哈希值,用于2的幂的表。
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
这个 HASH_INCREMENT 值保证了均匀散列分布。
public T get() {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程持有的ThreadLocalMap实例, threadLocalMap中的键对应当前ThreadLocal实例,值对应set进去的value
ThreadLocalMap map = getMap(t);
// 如果线程中的 threadLocalMap 不为空
if (map != null) {
// 获取当前threadLocal实例对应的map中的entry元素
ThreadLocalMap.Entry e = map.getEntry(this);
// 如果entry不为空,则返回entry中的value
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// map为空或者entry为空时
return setInitialValue();
}
private Entry getEntry(ThreadLocal<?> key) {
// 获取key在table上的index(这里获取的index是在不冲突的情况下,也就是不冲突情况下,key应该在table的index位置上)
// 获取key值对应的hashcode,&table.length-1(就是key.threadLocalHashCode%table length)
int i = key.threadLocalHashCode & (table.length - 1);
// 获取table[i]元素
Entry e = table[i];
// 如果i上的entry不为空,并且该entry的key和传入key相等,就返回当前entry(这种情况就是i这个位置上不存在冲突)
if (e != null && e.get() == key)
return e;
//如果当前 entry, 或者当前 entry 对应的 key 和传入key不同时
else
//在直接散列槽中找不到key时使用的getEntry方法的版本。
//如果e==null。这个entry被remove了,否则往后查找
return getEntryAfterMiss(key, i, e);
}
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
// 获取 threadLocalMap 的数据结构
Entry[] tab = table;
// 获取table大小
int len = tab.length;
// 如果 entry 不为空,向后遍历entry
while (e != null) {
// 获取当前entry对应的key值
ThreadLocal<?> k = e.get();
// 判断key值和传入的key值是否相等,若果相等直接返回当前entry
if (k == key)
return e;
//如果当前entry对应的key值为空时,进行 expungeStaleEntry 操作
if (k == null)
// 删除当前位置上的 entry ,将当前位置到下一个空卡槽中的数据,重新获取index,重新整理
expungeStaleEntry(i);
else
// 如果 key 不为空,获取下一个卡槽
i = nextIndex(i, len);
e = tab[i];
}
return null;
}