Java 多线程整理

线程的概念

  1. 一个程序可能包含多个并发运行的任务,线程是指一个任务从头到尾的执行流。
  2. 多个线程共享CPU时间成为时间共享,而操作系统负责调度及分配资源给它们。
  3. 多线程可以使程序反应更快,交互性更强,执行效率更高。

创建任务和线程

  1. 首选需要创建任务,为任务定义一个类。任务类必须实现Runnable接口它只包含了一个run方法,这个方法用来告诉系统线程如何运行。
  2. 任务必须在线程中进行,故需要利用Thread创建任务的线程。
  3. 调用start() 方法告诉Java虚拟机该线程准备运行。
  4. Java虚拟机通过调用任务的run()方法执行任务,Java虚拟机会自动调用该方法,无需特意调用。直接调用run()只是在同一个线程中执行该方法,而没有新线程被启动。

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package thread;
public class MultiThread {
public static void main(String[] args) {
//Create Tasks
Runnable printA=new printchar('A',100);
Runnable printB=new printchar('B',100);
Runnable print100=new printnum(100);
//Create Thread
Thread thread1=new Thread(printA);
Thread thread2=new Thread(printB);
Thread thread3=new Thread(print100);
//Run Thread
thread1.start();
thread2.start();
thread3.start();
}
}
class printchar implements Runnable {
private char chartoprint;
private int times;
public printchar(char chartoprint, int times) {
this.chartoprint = chartoprint;
this.times = times;
}
@Override
public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < times; i++) {
System.out.print(" " + chartoprint);
}
}
}
class printnum implements Runnable {
private int num;
public printnum(int num) {
this.num = num;
}
@Override
public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < num; i++) {
System.out.print(" " + i);
}
}
}

Thread类

Thread类包含的方法:

  • Thread() 创建一个空的线程
  • Thread(Task: Runnable) 为指定任务创建一个线程
  • start() 启动线程使得方法run() 被JVM调用
  • isAlive() 测试线程当前是否正在运行
  • setPriority(p:int) 设置线程的优先级p(范围从1到10)
  • join() 等待当前线程的结束,用于插入其他线程执行的过程中
  • sleep(millis: long) 使当前线程睡眠指定的数
  • yield() 使当前线程暂停并允许执行其他线程
  • interrupt() 中断线程
  1. Tread类还包含方法stop(), suspend(), resume() 但是由于这些方法具有内在的不安全因素,故被停用。可以通过给Thread赋值null表明停止以代替stop()。

  2. sleep方法可能跑出一个InterruptedException,这是一个必检异常。当一个休眠线程的interrupt()方法被调用时,就会发生这样的异常。必须将他放在try-catch块中。如果在一个循环中调用了sleep方法,应该将其放入块内,如果循环在块外,即使线程中断,也可能继续执行。

    例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    //Right
    public void run(){
    try{
    while(...){
    ...
    Thread.sleep(1000);
    }
    }
    catch(InterruptedException ex){
    ex.printStackTrace();
    }
    }
    //Wrong
    public void run(){
    while(...){
    try{
    ...
    Thread.sleep(1000);
    }
    catch(InterruptedException ex){
    ex.printStackTrace();
    }
    }
    }
  3. 如果总有一个优先级较高的线程在运行,或者有一个相同优先级的线程不退出,那么这个线程可能永远没有运行的机会。这种情况成为资源竞争或缺乏状态。为避免竞争现象,高优先级的线程必须定时地调用sleep方法或者yield方法,来给其他线程运行的机会。

线程池

由于为每个任务创建一个线程,对大量任务而言是不够高效的。为每个任务开始一个新线程可能会限制流量并且造成性能降低。线程池是管理并发执行任务个数的理想方法。

Executor执行线程,而ExecutorService管理线程

  1. Executors包含方法:
    • newFixedThreadPool(numberOfThreads: int) 创建一个线程池,该线程池可并发执行的线程数量固定不变。当前任务结束后,它可以被重用以执行另一个任务。
    • newCachedThreadPool() 创建一个线程池,可以按需创建新线程,但当前面创建的线程可用时,重用它们。
  2. ExecutorService包含方法:
    • shutdown() 关闭执行器,但允许完成执行器中的任务。一旦关闭,不再接受新的任务。
    • shutdownNow() 即使当前线程池中还有未完成的线程,也立即关闭。返回未完成任务的清单。
    • isShutdown() 已经关闭则返回true。
    • isTerminated() 如果线程中所有任务都被终止,则返回true。
  3. Executor包含方法:
    • execute(Runnable object) 运行任务

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
package thread;
import java.util.concurrent.*;
public class MultiThread {
public static void main(String[] args) {
ExecutorService executor=Executors.newFixedThreadPool(1);
executor.execute(new printchar('A',100));
executor.execute(new printchar('B',100));
executor.execute(new printnum(100));
executor.shutdown();
}
}
  • 上例中三个任务将交替执行
  • 假如newFixedThreadPool(1) 则三个任务顺次执行
  • 假如newCachedThreadPool() 则三个任务并行(交替执行)

线程同步

任务之间以冲突的方式访问一个公共资源,这是多线程程序的普遍问题,称为竞争状态。 如果一个类的对象在多线程程序中没有导致竞争状态,则称这样的类为线程安全的。

  1. synchronized关键字

    public synchronized void method(…)

    一个执同步方法在执行之前需要加锁。对于实例方法,需给调用该方法的对象加锁;对于静态方法,要给这个类加锁。如果一个线程调用一个对象上的同步实例(静态)方法,首先给该对象(类)加锁,然后执行该方法,最后解锁。

  2. 同步语句

    以下两种方法效果相同:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public synchronized void xMethod(){
    //Method Body
    }
    public void xMethod(){
    synchronized(this){
    //Method Body
    }
    }

    同步语句允许设置同步方法中的部分代码,而不必是整个方法。这大大增强了程序的并发能力。

利用加锁同步

除了同步方法隐式加锁之外,还可以显示地加锁。

  1. java.util.concurrent.locks.Lock
    • lock() 加锁
    • unlock() 释放锁
    • newCondition() 返回绑定到Lock实例的新的Condition实例
  2. java.util.concurrent.locks.ReentrantLock
    • ReentrantLock() 等价于ReentrantLock(false),假公平策略,将锁给任意一个正在等待的线程。
    • ReentrantLock(fair:boolean) 设定fair:true后为真的公平策略,将锁会分给等待时间最长的线程。

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
private static Lock lock=new ReentrantLock(); //Create a lock
public void Method(...){
lock.lock(); //Acquire the lock
....
try(){
...
}catch(){
...
}finally{
lock.unlock();
}
}

线程间的协作

使用条件(Condition)可以实现线程间的通信。条件是通过Lock对象的newCondition()方法创建的对象。创建条件之后,可以使用await(),signal(),signalAll()方法来实现线程间的相互通信。

一旦线程调用条件上的await(),线程就进入等待状态,等待恢复的信号。如果忘记对状态调用signal()或者signalAll()那么线程会永远等待下去。

条件由Lock对象创建,为了调用任意的方法,必须先拥有锁lock.lock(),如果没有获取锁就调用这些方法,会抛出异常。

Condition包含的方法

  • await() 当前线程等待直到发生某个条件
  • signal() 唤醒一个等待线程
  • signalAll(): Condition 唤醒所有等待线程

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class ThreadCooperation {
private static Account account = new Account();
public static void main(String[] args) {
// Create a thread pool with two threads
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new DepositTask());
executor.execute(new WithdrawTask());
executor.shutdown();
System.out.println("Thread 1\t\tThread 2\t\tBalance");
}
// A task for adding an amount to the account
public static class DepositTask implements Runnable {
public void run() {
try { // Purposely delay it to let the withdraw method proceed
while (true) {
account.deposit((int)(Math.random() * 10) + 1);
Thread.sleep(1000);
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
// A task for subtracting an amount from the account
public static class WithdrawTask implements Runnable {
public void run() {
while (true) {
account.withdraw((int)(Math.random() * 10) + 1);
}
}
}
// An inner class for account
private static class Account {
// Create a new lock
private static Lock lock = new ReentrantLock();
// Create a condition
private static Condition newDeposit = lock.newCondition();
private int balance = 0;
public int getBalance() {
return balance;
}
public void withdraw(int amount) {
lock.lock(); // Acquire the lock
try {
while (balance < amount) {
System.out.println("\t\t\tWait for a deposit");
newDeposit.await();
}
balance -= amount;
System.out.println("\t\t\tWithdraw " + amount +
"\t\t" + getBalance());
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
finally {
lock.unlock(); // Release the lock
}
}
public void deposit(int amount) {
lock.lock(); // Acquire the lock
try {
balance += amount;
System.out.println("Deposit " + amount +
"\t\t\t\t\t" + getBalance());
// Signal thread waiting on the condition
newDeposit.signalAll();
}
finally {
lock.unlock(); // Release the lock
}
}
}
}

阻塞队列(Blocking Queue)

该接口继承于Queue接口,分为ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue。假如创建不受限的Linked,Priority,put方法永远不会阻塞。而Array通过数组实现阻塞队列,必须制定一个容量或者可选的公平性,在试图向满队列中插入或者空队列删除时会导致阻塞。

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.*;
public class ConsumerProducerUsingBlockingQueue {
private static ArrayBlockingQueue<Integer> buffer =
new ArrayBlockingQueue<Integer>(2);
public static void main(String[] args) {
// Create a thread pool with two threads
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}
// A task for adding an int to the buffer
private static class ProducerTask implements Runnable {
public void run() {
try {
int i = 1;
while (true) {
System.out.println("Producer writes " + i);
buffer.put(i++); // Add any value to the buffer, say, 1
// Put the thread into sleep
Thread.sleep((int)(Math.random() * 10000));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
// A task for reading and deleting an int from the buffer
private static class ConsumerTask implements Runnable {
public void run() {
try {
while (true) {
System.out.println("\t\t\tConsumer reads " + buffer.take());
// Put the thread into sleep
Thread.sleep((int)(Math.random() * 10000));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}

信号量

总是将release()方法放到finally子句中可以确保即使发生异常也能最终释放许可。

只有一个许可的信号量可以模拟一个相互排斥的锁。

java.util.concurrent.Semaphore

  • Semaphore(numberOfPermits:int) 创建一个带指定书目许可的信号量,公平策略为false
  • Semaphore(numberOfPermits:int, fair: boolean) 同上述的区别为可设置fair
  • acquire() void 获取信号量的许可,如果无许可可用,线程将被锁住直到有许可可用
  • release() void 释放一个许可给该信号量

例(新的Account内部类):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static class Account {
// Create a semaphore
private static Semaphore semaphore = new Semaphore(1);
private int balance = 0;
public int getBalance() {
return balance;
}
public void deposit(int amount) {
try {
semaphore.acquire(); // Acquire a permit
int newBalance = balance + amount;
// This delay is deliberately added to magnify the data-corruption
// problem and make it easy to see
Thread.sleep(5);
balance = newBalance;
} catch (InterruptedException ex) {
} finally {
semaphore.release(); // Release a permit
}
}
}

避免死锁

当两个或多个线程需要在几个共享对象上获取锁时,可能会导致死锁。每个线程都在等待另外一个线程释放它所需要的锁。

此时,可以通过资源排序技术避免死锁的发生。给每个需要锁的对象都指定一个顺序,确保每个线程都按这个顺序来获取锁。

线程的状态

任务在线程执行的过程中,可以是如下五个状态之一:新建、就绪、运行、阻塞或结束

  1. 新建一个线程,进入新建状态(New)
  2. 调用线程的start()方法启动线程后,进入就绪状态(Ready)。是可运行的,但是可能还没有开始运行,操作系统必须分配CPU时间。
  3. 当开始运行时,进去运行状态(Running),当发生以下两件事会进入就绪状态
    • 给定的CPU时间用完
    • 调用了线程的yield()方法
  4. 有一下几个方法来使得线程进入阻塞状态(Blocked):
    • join() 等待目标程序的结束进入就绪状态
    • sleep() 等到睡眠时间到进入就绪装填
    • wait() 等待时间到进入就绪状态
  5. 如果一个线程执行完了它的run()方法, 这个线程就结束了。
  6. isAlive() 判断线程的状态,如果是就绪、阻塞或运行状态,返回true。若是新建没启动或者结束,返回false。