Java企业教程系列
使用Java8的Lambda简化ReadWriteLock
Java 8的lambda表达式能够显著提高代码质量和可读性。这里我们看看如何使得ReadWriteLock简化. 假设我们有一个类为Buffer,能够保存队列中最后几个消息,能够计数和去除老的消息,原先Java实现如下:
public class Buffer {
private final int capacity;
private final Deque<String> recent;
private int discarded;
public Buffer(int capacity) {
this.capacity = capacity;
this.recent = new ArrayDeque<>(capacity);
}
public void putItem(String item) {
while (recent.size() >= capacity) {
recent.removeFirst();
++discarded;
}
recent.addLast(item);
}
public List<String> getRecent() {
final ArrayList<String> result = new ArrayList<>();
result.addAll(recent);
return result;
}
public int getDiscardedCount() {
return discarded;
}
public int getTotal() {
return discarded + recent.size();
}
public void flush() {
discarded += recent.size();
recent.clear();
}
}
现在我们要多次使用putItem(), 但是内部最新的队列有其最大容量,它能够记得为防止内存泄漏去除了多少个条目。这个类在单线程下工作得很好,我们并使用了非线程安全型的ArrayDeque 和非同步 的int. 当对int的读写是原子时,在不同线程中改变值彼此并不能保证相互显现, 即使我们使用线程安全的BlockingDeque和AtomicInteger,我们还是会无法逃避争夺情况,因为这两个变量并不是在一个同步块中同步处理。
一个方式是将所有方法进行同步,方法中加入synchronized, 但是这相当有限制性,而且我们的读情况大于写情况,在这样情况下,读写锁ReadWriteLock是一个吸引人的选择,它其实由两部分锁组成– 一个是供读,一个是供写. 实际上,他们都在竞争同样的锁,这个锁可以由一个写或多个读线程同时获得,这样在没有人写时,我们可以获得并发读能力,偶尔发生的写操作会堵塞所有的读,而使用同步synchronized 会总是堵塞所有其他的读或写。
ReadWriteLock不足之处是引入了太多公式化,你得显式打开一个锁,然后记得关闭它unlock() ,代码变得难于阅读:
public class Buffer {
private final int capacity;
private final Deque<String> recent;
private int discarded;
private final Lock readLock;
private final Lock writeLock;
public Buffer(int capacity) {
this.capacity = capacity;
recent = new ArrayDeque<>(capacity);
final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
readLock = rwLock.readLock();
writeLock = rwLock.writeLock();
}
public void putItem(String item) {
writeLock.lock();
try {
while (recent.size() >= capacity) {
recent.removeFirst();
++discarded;
}
recent.addLast(item);
} finally {
writeLock.unlock();
}
}
public List<String> getRecent() {
readLock.lock();
try {
final ArrayList<String> result = new ArrayList<>();
result.addAll(recent);
return result;
} finally {
readLock.unlock();
}
public int getDiscardedCount() {
readLock.lock();
try {
return discarded;
} finally {
readLock.unlock();
}
}
public int getTotal() {
readLock.lock();
try {
return discarded + recent.size();
} finally {
readLock.unlock();
}
}
public void flush() {
writeLock.lock();
try {
discarded += recent.size();
recent.clear();
} finally {
writeLock.unlock();
}
}
}
使用Java 8的Lambda表达式我们可以横切锁这样的关注(类似AOP),将读写锁专门做一个类如下:
public class FunctionalReadWriteLock {
private final Lock readLock;
private final Lock writeLock;
public FunctionalReadWriteLock() {
this(new ReentrantReadWriteLock());
}
public FunctionalReadWriteLock(ReadWriteLock lock) {
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public <T> T read(Supplier<T> block) {
readLock.lock();
try {
return block.get();
} finally {
readLock.unlock();
}
}
public void read(Runnable block) {
readLock.lock();
try {
block.run();
} finally {
readLock.unlock();
}
}
public <T> T write(Supplier<T> block) {
writeLock.lock();
try {
return block.get();
} finally {
writeLock.unlock();
}
public void write(Runnable block) {
writeLock.lock();
try {
block.run();
} finally {
writeLock.unlock();
}
}
}
这个类我们实现了ReadWriteLock的包装(适配器模式),提供了一系列实用方法,原理上我们可以传递一个Runnable 或 Supplier<T> (这是一个只有 T get() 方法的接口) ,我们就能确保其运行能够被锁围绕,这样我们能在客户端使用Lambda简化调用代码:
public class Buffer {
private final int capacity;
private final Deque<String> recent;
private int discarded;
private final FunctionalReadWriteLock guard;
public Buffer(int capacity) {
this.capacity = capacity;
recent = new ArrayDeque<>(capacity);
guard = new FunctionalReadWriteLock();
}
public void putItem(String item) {
guard.write(() -> {
while (recent.size() >= capacity) {
recent.removeFirst();
++discarded;
}
recent.addLast(item);
});
}
public List<String> getRecent() {
return guard.read(() -> {
return recent.stream().collect(toList());
});
}
public int getDiscardedCount() {
return guard.read(() -> discarded);
}
public int getTotal() {
return guard.read(() -> discarded + recent.size());
}
public void flush() {
guard.write(() -> {
discarded += recent.size();
recent.clear();
});
}
}
注意到这里putItem方法中我们使用Lambda表达式作为FunctionalReadWriteLock方法write(Supplier<T> block)的参数。
Java8还提供了集合转换为集合的方法,我们使用如下:
public void flush() {
guard.write(this::unsafeFlush);
}
private void unsafeFlush() {
discarded += recent.size();
recent.clear();
}
public List<String> getRecent() {
return guard.read(this::defensiveCopyOfRecent);
}
private List<String> defensiveCopyOfRecent() {
return recent.stream().collect(toList());
}