并行化

并行和并发的区别

  • 并行:并行更着重于硬件层面,在不同的cpu上同时工作。
  • 并发:并发更着重与软件层面,描述的情况是两个以上的action可能同时发生。

stream的并行化处理

调用parallelStream方法即可将stream并行化。

1
2
//list中的元素求和
result = linkedList.stream().parallel().mapToInt(integer -> integer).sum();

对应的串行化则是sequential()方法。

1
result = linkedList.stream().sequential().mapToInt(integer -> integer).sum();

那么我们check下两者的运行时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) throws Exception {
List<Long> times = new ArrayList<>();

List<Integer> linkedList = new LinkedList<>();
for (int i = 0; i < 100000000; i++) {
linkedList.add(10);
}
int result = 0;
for (int i = 0; i < 10; i++) {
long startTime = System.currentTimeMillis();
//Parallel
result = linkedList.stream().parallel().mapToInt(integer -> integer).sum();
//Sequential
result = linkedList.stream().sequential().mapToInt(integer -> integer).sum();
System.out.println(result);
times.add(System.currentTimeMillis() - startTime);
}
System.out.println(getAverage(times));

}

static private long getAverage(List<Long> times) {
long average = times.stream().reduce((acc, element) -> acc + element).get() / times.size();
return average;
}

1
2
Parallel Result:1754
Sequential Result:249

可以看到并行并不是一定性能更好,并行能提升性能的前提在于数据量足够大,主要有四点

  • 数据大小:如上例,只有输入的数据足够大的时候才有用。
  • 源数据结构:上述例子使用的数据结构是LinkedList,之后用ArrayList看看效果。
  • 装箱:处理基本类型要比装箱类型要快。
  • 核的数量:如果只有一个cpu,那就没有并行的意义。换句话说核越多,性能提升幅度越大。

换成ArrayList试下

1
2
3
4
5
6
7
8
9
10
11
List<Integer> linkedList = new ArrayList<>();
for (int i = 0; i < 100000000; i++) {
linkedList.add(10);
}
int result = 0;
for (int i = 0; i < 10; i++) {
long startTime = System.currentTimeMillis();
result = linkedList.stream().parallel().mapToInt(integer -> integer).sum();
times.add(System.currentTimeMillis() - startTime);
}
System.out.println(getAverage(times));

1
Result:86

可以看到性能一下子就提升了。因此数据结构对数据并行化影响很大。

因为求和原理是fork/join框架,也就是fork递归细分问题,最后通过join将数据整合,因此数据结构分解的性能至关重要。

  • T1:ArrayList、数组或IntStream,支持随机读取。
  • T2:HashSet、TreeSet等等。
  • T3:上述的LinkedList,还有长度未知的BufferedReader.lines,因为不知从哪里开始分解。

数据的并行化处理

需要将一个数组作为参数传入Arrays.parallelSetAll()方法,不过需要注意的是传入的数组会被修改,并不是创建一个新的数组。

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) throws Exception {
double[] doubles = new double[10];
for (int i = 0; i < 10; i++) {
doubles[i] = i;
}
Arrays.parallelSetAll(doubles, i -> i + 0.5);
for (double d : doubles) {
System.out.println(d);
}
}

1
2
3
4
5
6
7
8
9
10
11
Result:
0.5
1.5
2.5
3.5
4.5
5.5
6.5
7.5
8.5
9.5

并行化的陷阱

下面代码在串行和并行情况下结果不一样

1
2
3
4
5
6
7
List<Integer> integerList = new LinkedList<>();
integerList.add(1);
integerList.add(2);
integerList.add(3);
integerList.add(4);
int i = integerList.stream().parallel().reduce(5, (acc, element) -> acc * element);
System.out.println(i);

原因在于并行化了之后每个元素都执行了reduce(5, (acc, element) -> acc * element)。

原来是5 * 1 * 2 * 3 * 4,并行化后变成了5 * (5 * 1) * (5 * 2) * (5 * 3) * (5 * 4)
正确的应该是

1
int i = integerList.stream().parallel().reduce(1, (acc, element) -> acc * element) * 5;

Stream

传统的for循环其实是使用迭代器iterator来实现的,例如从列表中取出所有的偶数。

1
2
3
4
5
for (int i : integerList) {
if (i % 2 == 0) {
System.out.println(i);
}
}

用iterator来写就是,用文字来描述就是从列表中取出一个进行判断是否要输出,这是种串行化操作,叫做外部迭代。给人感觉就是都混在了一起。

1
2
3
4
5
6
7
Iterator<Integer> iterator = integerList.iterator();
while (iterator.hasNext()) {
int i = iterator.next();
if (i % 2 == 0) {
System.out.println(i);
}
}

而使用的stream是内部迭代。这时整个逻辑被分割开来,filter只负责过滤,foreach负责每个item如何处理。

1
2
3
integerList.stream()
.filter(integer -> integer % 2 == 0)
.forEach(integer -> System.out.println(integer));

  • 惰性求值:只是用来描述stream,比如filter。

  • 及早求值:希望得到返回结果,例如foreach或者count。

判断惰性和及早只需要看返回值,如果返回值是stream,那么就是惰性,否则则是及早。

大致知道一些常用的,例如map(映射成别的值),filter(过滤),collect(生成新的列表)等等。尽量以后在敲代码的时候多用Lamda表达式,一是为了练习,二是Lamda表达式确实看起来舒服很多。

使用stream排序

使用stream方法中的sorted方法可以给一个stream排序,也可以自定义,默认升序。

1
2
3
4
5
6
7
8
9
10
list.stream().sorted() 

list.stream().sorted(Comparator.reverseOrder())

list.stream().sorted(Comparator.comparing(Student::getAge))

list.stream().sorted(Comparator.comparing(Student::getAge).reversed())

//最后需要将排序完的结果转化成list
list.stream().sorted().collect(Collectors.toList());

Lamda表达式

final关键字

Lamda表达式中引用的局部变量必须是final或既成事实上的值。
换句话说Lamda表达式需要的是值而不是变量,如果一个变量被final修饰了,那么表示它只会被赋值一次,那么便是个定值,而如果我们对某个普通变量多次赋值,对Lamda表达式来说他不知道到底该用哪个值。

1
2
3
4
5
6
String str = "hello";
str = "world";
Runnable runnable = () -> {
//Variable used in lambda expression should be final or effectively final
System.out.println(str);
};

1
2
3
4
5
String str = "hello";
Runnable runnable = () -> {
//valid
System.out.println(str);
};

Predicate接口

该接口表示传入的对象是否满足某种条件,并返回boolean作为真假判断。check下源码,该接口只接受一个参数,返回一个布尔值。

1
2
3
4
5
public interface Predicate<T> {

boolean test(T t);
.......
}

例如我们希望从一个int的列表中找到所有的偶数,那么Predicate可以这么写。

1
2
3
4
5
Predicate<Integer> predicate = integer -> {
return integer % 2 == 0;
};

predicate.negate();// 过滤出所有的奇数

总结

  • Lamda表达式是一个匿名的方法,将行为像数据一样传递。
  • Lamda表达式也被称为闭包,未赋值的变量与周边环境隔离起来,进而被绑定到一个特定的值。

Volatile

原子性

原子性操作是指不能被线程调度机制中断的操作,一旦操作开始,在可能发生的上下文切换之前执行完。

java中对除了long和double以外的基本类型进行读写操作都是原子性的,因为long和double是64位的,JVM有可能将其分为两个32位的。不过新的jdk也实现了long和double的原子读写。

这里说的读写操作并不包括自加等。因为自加其实是分成了两步来走。

1
2
3
4
5
6
7
8
9
i++

0: aload_0
1: dup
2: getfield
5: iconst_1
6: iadd
7: putfield
10: return

这个操作有put和get,如果在put和get之间发生了变化,那么得到的结果可能不正确,因此不是原子性操作。当然java中有AtomicInteger、AtomicLong、AtomicBoolean等原子类。

Volatile修饰词具有如下两种属性

可见性

通俗的说就是当这个值被修改了,那么所有的读取操作都可以看到这个修改。

对一个普通的变量i来说,正常的读写操作是先将变量从主内存拷贝到工作内存(缓存)中,进行一定的操作之后,再写入主内存,但是什么时候写入是不确定的。那么由于两个线程工作在两个cpu上,如果cpu1修改了i,但是没有及时写入主内存,而cpu2直接去读取i,结果就是这个i不是最新的。

用volatile修饰了的变量,在被修改之后会强制立即写入主内存。这样能保证在读取的时候一定是最新的。并且读取操作也是直接从主内存中读取。

因此volatile保证了对于其他线程的可见性。

同样能实现可见性的还有sychronized和final关键字,sychronized的可见性是由对一个变量执行unlock操作之前,必须先把次变量同步到主内存中(store、write操作)。而final关键字的可见性是被final修饰的变量在构造器中一旦初始化完成,并且构造器没有把this的引用传递出去,那在其他线程中就能看见final变量的值。

有序性

在JVM中,编译器和处理器可能会对指令进行重排,或许是为了优化,但是如果被volatile修饰的变量则会告诉编译器不要重排指令,按照顺序执行就可以,避免了并发产生的问题。

1
2
3
4
5
int a = 1;
int b = 2;

a++;
b++;

以上代码可以重排列成如下,因为a与b之间没有依赖。如果有依赖并且重排的话则会产生并发问题。

1
2
3
4
5
int a = 1;
a++;

int b = 2;
b++;

但是在多线程中就不一定了,如下代码就可能出现并发问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int a = 0;
bool flag = false;

public void write() {
a = 2; //1
flag = true; //2
}

public void multiply() {
if (flag) { //3
int ret = a * a;//4
}

}

如果有两个线程同时执行writemultiply方法,ret不是一定为4.如果write方法中的1和2重排序下,先赋值flag再赋值a,那么在multiply中可能读到a的值为0;

当然也可以使用synchronized和lock来保证有序,因为在某个时间段只有一个线程能执行被修饰的代码。

优先选择一定是sychronized,除非只有一个变量,否则都不应该只使用volatile,他并不能保证并发的冲突不发生。同样的也不能改变自加不是原子性的本质。

happens-before

说到重排序,插播一段happens-before原则,这个原则是为了保证线程A所执行的action对其他线程执行的action是可见的。如果没有这个原则,那么JVM就会随心所欲的修改action执行的顺序。

以下是happens-before的几种规矩

  1. 一个线程内的action

    1
    2
    3
    4
    5
    6
    7
    state 1
    state 2
    state 3 // All states happen before state n

    ...

    state n
  2. 监视锁
    在B线程获取锁之前,A线程会先将锁释放(同一个锁)。

  3. 被volatile修饰的变量
    线程A和线程B都用到了被volatile修饰的变量,那么写的操作一定发生在读之前。
  4. 线程开始规则
    B.start()一定发生在B的run方法中所有语句之前。

    1
    2
    3
    4
    5
    6
    7
    8
    Thread B = new Thread();
    B.start();

    //Thread B
    public void run() {
    //state 1
    //state 2
    }
  5. 线程join规则
    调用了join方法,则需等待子线程的所有语句执行完才能只能join之后的代码。

  6. 传递性
    如果A happens-before B,并且B happens-before C,那么A happens-before C。

Concurrent AQS

除了使用synchronized关键字,还可以显式使用lock对象来解决并发问题。

ReentrantLock

1
2
3
4
5
6
7
8
Lock lock = new ReentrantLock();
lock.lock();
try {
//add code you want to lock
count++;
} finally {
lock.unlock();
}

ReentrantLock是一个能和synchronized实现同样功能的对象,只是在实现方面更为灵活一点,比如尝试获取锁,过一段时间放弃它。因此被lock住的线程是线程安全的。

ReentrantLock分为公平模式和非公平模式,非公平模式其实就是在lock的时候多了个操作,就是尝试下能否直接获取,拿不到就乖乖排队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
......
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}
......
}

一定要使用try和finally的组合,能确保锁被释放,以防引起阻塞。

ReentrantReadWriteLock

这个lock是以read和write成对的锁。

1
2
3
4
5
6
7
8
9
ReadWriteLock lock = new ReentrantReadWriteLock();

lock.readLock().lock();
//add code to read
lock.readLock().unlock();

lock.writeLock().lock();
//add code to write
lock.writeLock().unlock();

该对象在实例化的时候可以传个boolean值表示选择公平模式还是非公平模式。

1
2
3
4
5
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

  • 公平模式:先到先得,晚来的排队,不管是读锁还是写锁都没有特权。
  • 非公平模式:不阻塞write锁。write锁优先级高
    默认采用的是非公平模式。

write锁是独占模式,互相干扰,而read锁则是共享模式,read锁之间互不干扰。

具体原理需要先去理解下AQS的框架才行。因为ReentrantLock中几乎所有的方法都是调用被sync对象封装起来的方法,而这个sync对象是AbstractQueuedSynchronizer(AQS)的子类。也就是说大体逻辑都在AQS中得以实现,ReentrantLock和ReentrantReadWriteLock进行了不同的定制而已。

AQS

功能

既然是sychronizer,那必须有一个flag来代表lock或者unlock。在AQS中使用state来维护。

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

有了state,必须要有state对应的get/set方法。在AQS中是acquirerelease。不过被包装在了子类中名称不太一样,比如Lock.lock,Semaphore.acquire,CountDownLatch.await.

1
2
3
4
5
6
7
8
9
10
11
12
//ReentrantLock.java
final void lock() {
acquire(1);
}

//AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
//tryAcquire的实现在ReentrantLock中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

每个synchronizer必须支持以下三个特性:

  • 阻塞的synchronization和非阻塞的synchronization
  • 超时选项
  • 可以通过interruption来打断

将state分为两种情况:

  • exclusive mode:独占模式,在同一个时间只有一个线程可以访问(acquire和release)
  • share mode:共享模式,有多个线程可以同时访问(acquireShared和releaseShared)

设计思想

同步的问题主要围绕对锁的逻辑处理,也就是acquire和release。

获取锁的操作:

while (state不允许acquire) {

将该thread排队

_}_

如果在队列中就从队列中清除

释放锁的操作:

更新state

if(允许其他被阻塞的线程acquire) {

释放在队列中的一个或多个线程

_}_

因此它必须有以下三个模块

  • 原子性操作state
  • 阻塞和释放线程
  • 维护队列

State

在AQS中的state是一个32位int类型的变量,0表示未被持有,1表示已经被持有。并且扩展出了get/set和compareAndSet,而它们之所以能完成原子性操作主要是基于volatile关键字。

compareAndSet这个方法会传入两个参数,expect和update,如果current state和expect相等,则会直接更新update。

而AQS的实现类必须实现tryAcquire和tryRelease方法,返回为true则表示持有锁成功。

1
2
3
4
5
6
7
8
9
//AbstractQueuedSynchronizer.java 
//默认直接抛出异常,需要子类重写
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

Block

在AQS中使用LockSupport的park和unpark方法来真正实现lock功能。park方法和unpark作为成对出现,并且park方法可以传入timeout的参数,用来实现timeout的同步锁。

1
LockSupport.parkNanos(this, nanosTimeout);

这里LockSupport只是作为工具类,具体的逻辑还是要看queue中是怎么处理的。

Queue

通过源码查看到内部维护了一个Node的内部类,并且该Node既有prev又有next的成员变量,因此这是一个双向的链表。以acquire为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

......

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

  • tryAcquire->先尝试获取状态,由子类自己实现
  • addWaiter->生成一个节点,并将节点放入队列中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
  • 使用自旋锁,并且如果前面一个是头节点了则不断去尝试获取状态。也就是说下一个就到这个线程,那么他就会一直去尝试获取,确保能最快获取到状态

  • shouldParkAfterFailedAcquire(p, node)->如果在该线程还有其他线程在等待,那么就判断是否需要挂起操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
    return true;
    if (ws > 0) {
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }
  • 只有当节点的线程状态为SIGNAL的时候,才需要进行挂起操作

  • ws>0表示前一个节点是canceled状态,因此就持续往前找,直到找个一个有效的节点,并将新生成的节点放在其后
  • 如果不是canceled状态,那么需要将其切为SIGNAL状态
  • 如果需要挂起那么就使用LockSupport.park将其挂起,等待unpark
    结合release来看

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final boolean release(int arg) {
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
  • tryRelease->同样的想用子类的去释放状态

  • 如果成功释放了,那么唤醒队列中的头节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    }
  • LockSupport.unpark(s.thread) -> 唤醒头节点的后续节点

  • 由于已经unpark了所以acquireQueued会继续执行,当、一旦头节点释放资源,后续节点则可以tryAcquire

原则上来说该队列是FIFO原则,先来先到。

比如在银行拿号,如果柜台有人了,那么你得排队

  • 前面就轮到你了,那么你尝试排个队,如果失败,那么你认定前面还在处理业务,就标记为SIGNAl,过了一会儿又看了下,o!是SIGNAL,那么我还是先休息下,也就是park操作。
  • 后面又来一个人,看到你在排队但不是SIGNAL,那么把你置为SIGNAL,然后再试一次,失败之后发现是SIGNAL,那么他就直接休息(park)了。
  • 如果看到前面的人不排了,也就是cancel状态,那么就一直往前找,直到找到一个确实在排队的人,排在他后面。如果前面一个人状态不太对但是也确实在排,那会把他标记成SIGNAL。

具体还有很多细节问题没有深入,暂时只是知道一个大概。毕竟AQS是java锁机制的一个基础,还是需要多看几遍才行。

Basic Callback

项目中需要提供api和一个被动接受的回调,C++的回调方式主要有三种:

  • 传递函数指针
  • 实现接口
  • lamda表达式

实现接口

新建一个接口类,ITokenGetCallback.hpp,添加需要其他类实现的虚函数。

1
2
3
4
5
6
7
8
9
#ifndef ITokenGetCallback_h
#define ITokenGetCallback_h

class ITokenGetCallback {
public:
virtual ~ITokenGetCallback() {}
virtual void onGetToken(const unsigned char *) = 0;
};
#endif /* ITokenGetCallback_h */

BTW:一般在构造接口的时候需要把析构函数也虚化,原因是为了确保对象在被回收之后析构函数能被调用。这就涉及到虚函数的具体实现意义了。

贴一个stackoverflow上的例子。

1
2
3
4
5
6
7
8
9
10
11
class Animal
{
public:
void eat() { std::cout << "I'm eating generic food."; }
};

class Cat : public Animal
{
public:
void eat() { std::cout << "I'm eating a rat."; }
};

1
2
3
4
Animal *animal = new Animal;
Animal *cat = new Cat;
animal->eat(); // outputs: "I'm eating generic food."
cat->eat(); // outputs: "I'm eating generic food."

即使我们给他赋值子类,但是函数的调用还是基于指针的类型,这里可以类比到析构函数。

添加一个实现该接口的类

1
2
3
void TokenGet::onGetToken(const unsigned char * token){
std::cout << "on get token" << token << std::endl;
}

剩余要做的事情就是把这个类传递到想调用的地方。可以使用set或者作为构造函数的参数传入。

1
2
3
4
5
6
//通过函数参数传递
TokenGet* callback = new TokenGet();
gClient->StartRecording(callback);

auto callback = std::shared_ptr<TokenGet>(new TokenGet());
gClient->setCallback(callback);

之后在想调用的地方调用这个call即可。接口的方式属于OOP的一种模式吧,这方面和java很类似。

lamda表达式

这种方式一般用在次数比较少的回调。
用std的function包装一个方法并设置返回值和参数。

1
using OnVolumeCallback = std::function<void(int volume)>;

然后将该函数作为参数放在你想使用的函数中。

1
2
3
4
5
void APIClient::getSpeakerVolume(OnVolumeCallback callback)
{
//do something
callback(volume);
}

在调用这个函数的地方实现这个lamda表达式

1
2
3
gClient->getSpeakerVolume([](int volume){
std::cout << "volume: " << volume << std::endl;
});

相当来说lamda表达式更简单一点。不过如果是多次回调还是应该用接口的方式,接口的方式在层次上也更分明。

lamda表达式是这样定义的

1
2
3
4
[ capture clause ] (parameters) -> return-type  
{
definition of method
}

其中parameters和return-type可以省略。关于这两者和函数具体实现没什么好多说的,第一次知道还有捕获列表这样的东西。


捕获列表就是用[]包起来的列表,可以传引用、指针、变量的拷贝,不过无法传递右值。通俗来说就是被这个捕获列表包起来的参数可以在lamda表达式中使用。

1
2
3
4
int j = 5;
auto f = []{
j = 1; //invalid,Variable 'j' cannot be implicitly captured in a lambda with no capture-default specified
};

如果不在捕获列表中添加j的话是无法访问的。可以理解为用非传统的方式把变量作为参数传递进入函数。

NIO

NIO

NIO有说是new io,也有说是non-blocking io,Whatever,nio是jdk 1.4之后引入的,目的是为了提高io速度,并且旧的io也已经被重新实现过了。记得以前还在odm的时候,经常会碰到ANR的原因是系统在执行io操作导致的,因此看来优化一下io还是有必要的。

Channel和Buffer

之所以将channel和buffer放在一起是因为这两个是提高io速度的关键。旧的io形式是面向流的,例如经常用的InputStream和OutputSteam,比如在我们读取文件的时候,每次从流中读取一个字节,说的不恰当一点就是在文件和程序之间建立一个通道,并且是一个单向的通道,每一次把文件中的东西拷贝到程序中。

最致命的就是Stream的操作是阻塞的,在调用read方法的时候我们什么都做不了,只能等待它执行完了之后返回。

因此channel和buffer诞生了,这边结合Thinking in java中对channel和buffer的描述加以理解,把channel当成矿场,而把buffer当成矿场中的坑,那么我们在读取的时候其实是先把矿场里的煤矿丢进坑里,然后去拿坑里的煤矿,写数据的时候反之即可。这样形成的读写模式是非阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
try {
//把stream转化成矿场(channel)
FileChannel channel = new FileOutputStream("data.txt").getChannel();
//把我们要加入的煤矿丢进坑(buffer),之后再把坑中的煤矿丢尽矿场
channel.write(ByteBuffer.wrap("First".getBytes()));
channel.close();

//same as above
channel = new RandomAccessFile("data.txt", "rw").getChannel();
//把矿车移到最后一个坑,然后继续挖坑填矿
channel.position(channel.size());
channel.write(ByteBuffer.wrap("Second".getBytes()));
channel.close();

channel = new FileInputStream("data.txt").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
} catch (Exception e) {
e.printStackTrace()
}
}

再来看下buffer,这里用的字节buffer,当然也有FloatBuffer、 DoubleBuffer等等。
刚才说过了buffer类似于矿场中的坑,但是我们不可能把整个矿场都挖出坑来,因此我们在使用buffer的时候需要分配一个大概的数量,也就是allocate方法。通过源码在allocate方法中它其实是返回了一个HeapByteBuffer,这是个继承ByteBuffer的类。

1
2
3
4
5
6
7
public static ByteBuffer allocate(int var0) {
if (var0 < 0) {
throw new IllegalArgumentException();
} else {
return new HeapByteBuffer(var0, var0);
}
}

通过一系列的调用最后它给三个属性赋值,分别是capacity、limit、position

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Buffer(int var1, int var2, int var3, int var4) {
if (var4 < 0) {
throw new IllegalArgumentException("Negative capacity: " + var4);
} else {
this.capacity = var4;
this.limit(var3);
this.position(var2);
if (var1 >= 0) {
if (var1 > var2) {
throw new IllegalArgumentException("mark > position: (" + var1 + " > " + var2 + ")");
}

this.mark = var1;
}

}
}

  • capacity:一共有多少个坑

  • limit:最多有多少个坑

  • position:矿车在第几个坑

显然在刚开始分配的时候,capacity和limit是一样的,position是零。
当我们执行了填写数据的代码之后,矿车肯定是在数据字节大小的位置上。例如我们要读的文件存着test,那么矿车这时候是在第四个位置上。
BTW:这里从1开始计数是因为,在初始分配的时候就已经默认生成了一个坑,零号位的坑。
那么这个时候如果我们想要从坑中拿煤矿的话,至少要将矿车移动到最前面。也就是flip方法的作用。看一下源码就知道它到底干了啥。

1
2
3
4
5
6
7
8
public final Buffer flip() {
//把limit减少,提升效率
this.limit = this.position;
//将矿车移到开头
this.position = 0;
this.mark = -1;
return this;
}

buffer大部分的api都是对这三个属性操作的。例如clear方法。

1
2
3
4
5
6
public final Buffer clear() {
this.position = 0;
this.limit = this.capacity;
this.mark = -1;
return this;
}

总结

旧的io和nio主要有以下几点不同

  • io面向流,nio面向buffer
  • io是阻塞,nio非阻塞
  • nio有selector,可以理解为管理多个矿场。
    关于selector还处于一知半解的状态,希望以后在项目中能碰到。

std move

最近在查看别人搭的框架时,看到在传值的时候使用了std::move。因此学习一下这个函数的使用。
首先google查看官方解释:

In particular, std::move produces an xvalue expression that identifies its argument t. It is exactly equivalent to a static_cast to an rvalue reference type.

简单来说就是将一个值转化为右值。目的是为了提升效率,减少拷贝的数量。也就是进行深拷贝。

右值

那么右值又是什么?还是查看微软的官方解释吧。

Every C++ expression is either an lvalue or an rvalue. An lvalue refers to an object that persists beyond a single expression. You can think of an lvalue as an object that has a name. All variables, including nonmodifiable (const) variables, are lvalues. An rvalue is a temporary value that does not persist beyond the expression that uses it.

在c++中,表达式是要么左值要么右值,左值表示他对一个对象的应用远远超过了一个表达式(可以忽略),其实也可以把左值看成一个有名字的对象。所有的变量,常量都是左值。右值是一个临时变量,并不会比表达式保存的更久。

这个左和右是相对于等于号的。等于号左边必然是一个变量,右边是一个表示式或者是一个临时变量。感觉还是不太好用语言来解释这玩意。贴一段官方的sample。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Correct usage: the variable i is an lvalue.  
i = 7;

// Incorrect usage: The left operand must be an lvalue (C2106).
7 = i; // C2106
j * 4 = 7; // C2106

// Correct usage: the dereferenced pointer is an lvalue.
*p = i;

const int ci = 7;
// Incorrect usage: the variable is a non-modifiable lvalue (C3892).
ci = 9; // C3892

// Correct usage: the conditional operator returns an lvalue.
((i < 3) ? i : j) = 7;

move

还是看回move函数,比如在写交换函数的时候我们通常这么写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void swap(string &,string &);
int main(int argc, const char * argv[]) {
string x = "abc";
string y = "wang";
swap(x, y);
cout << x << endl;
cout << y << endl;
return 0;
}
void swap(string & x,string & y)
{
string tmp = x;
x = y;
y = tmp;
}

这样的话其实在执行交换的过程中会产生三个copy的对象,会影响性能。不过有时候我们传递了某个值之后就不再不需要他了,如果留着一个拷贝的话也会占用空间,因此可以使用move函数来表示这个值以后没用了,已经传走了。

1
2
3
4
5
6
void swap(string & x,string & y)
{
string tmp = move(x);
x = move(y);
y = move(tmp);
}

这样就不会有三分拷贝,而是值的转移。不过这么写似乎看不出值已经传走了。用个vector试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int main(int argc, const char * argv[]) {
vector<string> v;
string str = "test";
v.push_back(str);
cout << "str :" << str << endl;
for (int i = 0; i<v.size(); i++) {
cout << "vector" << i << " :" << v[i] << endl;
}
cout << "after move" <<endl;
v.push_back(move(str));
cout << "str :" << str << endl;
for (int i = 0; i<v.size(); i++) {
cout << "vector" << i << " :" << v[i] << endl;
}
return 0;
}

1
2
3
4
5
6
7
Result:
str :test
vector0 :test
after move
str :
vector0 :test
vector1 :test

可以看到str已经为空并且作为右值存在了vector中。

总结

并不是只在传递值的时候可以用move,更关键的是当你想把某个左值转化成右值的时候需要用到这个函数。

因为查看该函数的源码就是将传入的值转为右值引用。

1
2
3
4
5
move(_Tp&& __t) _NOEXCEPT
{
typedef typename remove_reference<_Tp>::type _Up;
return static_cast<_Up&&>(__t);
}

Serializable

原因

由于对象在程序结束的时候会被销毁,因此如果希望对象能够在程序不运行的时候仍然能保存信息,那么就需要用到对象的序列化操作,简单来说就是将对象转换成字节序列,等需要的时候再反序列化即可读取其中的信息。又或者在Android开发中通过intent传递对象的时候,需要将对象序列化然后才能传递。

使用方法

1
2
3
4
5
6
7
8
//写入文件
ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream("test.obj"));
objectOutputStream.writeObject(new Data(1));
objectOutputStream.close();
//读取文件
ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream("test.obj"));
Data data = (Data) objectInputStream.readObject();
System.out.println(data);

transient关键字

如果某个类的属性不希望被序列化则可以加上transient关键字。

1
transient  private String str;

当在反序列化读取的时候就会为null或者是类型的默认值。

序列化中的static

如果在序列化之后修改了static类型的变量,那么打印出来会如何呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Data implements Serializable {
private int n;
transient public static int i = 5;

public Data(int n) {
this.n = n;
}
@Override
public String toString() {
return "Data{" +
"n=" + n +
'}';
}
}
public static void main(String[] args) {
try {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream("test.obj"));
objectOutputStream.writeObject(new Data(1));
objectOutputStream.close();

Data.i = 10;
ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream("test.obj"));
Data data = (Data) objectInputStream.readObject();
System.out.println(data);

} catch (Exception e) {
e.printStackTrace();
}
}

结果是10,原因是static是针对类的属性,而不是针对对象的属性。因为静态变量可以直接使用,所以序列化并不保存静态变量。

Externalizable

Externalizable是一个继承Serializable的接口,并增加了两个方法

1
2
3
4
public interface Externalizable extends Serializable {
void writeExternal(ObjectOutput var1) throws IOException;
void readExternal(ObjectInput var1) throws IOException, ClassNotFoundException;
}

这两个放在分别会在write和read的时候调用,我们可以实现这个接口,如果有其他操作可以放在这两个方法中,并且他会readExternal之前调用该类的无参构造方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Data implements Externalizable {
private int n;

public Data(int n) {
this.n = n;
}

public Data() {
System.out.println("no pm constructor");
}

@Override
public String toString() {
return "Data{" +
"n=" + n +
'}';
}

@Override
public void writeExternal(ObjectOutput objectOutput) throws IOException {
System.out.println("writeExternal()");
}

@Override
public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
System.out.println("readExternal()");
}
}
public static void main(String[] args) {
try {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream("test.obj"));
objectOutputStream.writeObject(new Data(1));
objectOutputStream.close();

ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream("test.obj"));
Data data = (Data) objectInputStream.readObject();
System.out.println(data);

} catch (Exception e) {
e.printStackTrace();
}
}

1
2
3
4
5
Result:
writeExternal()
no pm constructor
readExternal()
Data{n=0}

奇怪的是打印出的n居然是零,但是我们写进去的是一个n=1的对象。原因是Externalizable调用了无参的构造方法重新赋值了一遍。

这也是Serializable和Externalizable最大的区别:Serializable是完全根据二级制文件来构造对象,但是Externalizable是根据构造方法和readExternal方法来构造的。

因此正确的做法需要在writeExternal和readExternal的时候写入和赋值。这样的好处是更容易控制变量的存亡。

1
2
3
4
5
6
7
8
9
10
11
@Override
public void writeExternal(ObjectOutput objectOutput) throws IOException {
System.out.println("writeExternal()");
objectOutput.writeInt(n);
}

@Override
public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
System.out.println("readExternal()");
n = objectInput.readInt();
}

Externalizable的替代方法

如果觉得Externalizable这样太麻烦的话,我们依然可以使用Serializable,不过需要添加两个方法。并遵循它们的特征签名

1
2
3
4
5
6
7
private void writeObject(ObjectOutputStream stream) {
System.out.println(Thread.currentThread().getStackTrace()[1].getMethodName());
}

private void readObject(ObjectInputStream stream) {
System.out.println(Thread.currentThread().getStackTrace()[1].getMethodName());
}

查看下源码就知道如果我们实现了这两个方法(虽然只是添加,就当他是实现吧),那么就不会走正常的序列化流程,而转为使用我们自己实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void invokeWriteObject(Object var1, ObjectOutputStream var2) throws IOException, UnsupportedOperationException {
this.requireInitialized();
if (this.writeObjectMethod != null) {
try {
this.writeObjectMethod.invoke(var1, var2);
} catch (InvocationTargetException var5) {
Throwable var4 = var5.getTargetException();
if (var4 instanceof IOException) {
throw (IOException)var4;
}

throwMiscException(var4);
} catch (IllegalAccessException var6) {
throw new InternalError(var6);
}

} else {
throw new UnsupportedOperationException();
}
}

如果我们自己实现的话实现方式和Externalizable一样

1
2
3
4
5
6
7
private void writeObject(ObjectOutputStream stream) throws IOException {
stream.writeInt(n);
}

private void readObject(ObjectInputStream stream) throws IOException {
n = stream.readInt();
}

序列化个人认为没有特别深入的必要,只要知道使用方法即可。

Constructor

构造函数

最近在自定义布局的时候发现kotlin的构造函数和java差的很多,因此重新学习记录下,先从简单的开始,模仿java中的写法

1
2
3
4
5
class User {
constructor(i: Int)
constructor(i: Int, j: Int)
constructor(name: String, i: Int, j: Int)
}

1
2
3
4
5
6
7
8
9
10
11
12
public final class User {
public User(int i) {
}

public User(int i, int j) {
}

public User(@NotNull String name, int i, int j) {
Intrinsics.checkParameterIsNotNull(name, "name");
super();
}
}

可以看到这种写法和java差不多。kotlin还允许在class定义的时候直接写构造函数,这种构造函数叫主构造函数。

1
2
3
class User constructor/*可省略*/(i: Int) {

}
1
2
3
4
public final class User {
public User(int i) {
}
}

像这种直接在类名旁边的叫primary constructor,而类似于java那种写法的构造函数叫Secondary Constructors。主构造函数只能有一个,次构造函数可以有多个。但是由于主构造函数不能加代码,因此kotlin提供了init关键字

1
2
3
4
5
class User constructor/*可省略*/(i: Int) {
init {
println("primary constructor i = $i")
}
}

1
2
3
4
5
6
public final class User {
public User(int i) {
String var2 = "primary constructor i = " + i;
System.out.println(var2);
}
}

那如果想同时有主构造函数和次构造函数怎么办呢,按照官方api说的是需要或直接或间接的委托于主构造函数,一开始没看懂,后来看了转成的java源码,原来他说的委托就是在次构造函数中调用主构造函数

1
2
3
4
5
6
7
8
class User(name: String) {
init {
println("primary")
}
constructor() : this(String())
constructor(i: Int) : this(String())
constructor(j: Int, string: String) : this(string)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class User {
public User(@NotNull String name) {
Intrinsics.checkParameterIsNotNull(name, "name");
super();
String var2 = "primary";
System.out.println(var2);
}

public User() {
this(new String());
}

public User(int i) {
this(new String());
}

public User(int j, @NotNull String string) {
Intrinsics.checkParameterIsNotNull(string, "string");
this(string);
}
}

从这边可以看到init代码块中的代码是在主构造函数中的,不过由于次构造函数都会调用主构造函数,所以也无需纠结这个init代码块是否会被调用。