并发编程—Semaphore

Semaphore是什么

Semaphore是JUC包下的一个工具类,官方是这样解释这个类的:

Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程要获取许可证就调用acquire方法,用完了释放资源就调用release方法。

这段话从字面上看可能不太好理解,举个实际生活中的例子就很容易明白了:比如一个餐馆只能容纳N个人,如果餐馆内人已经满了,那第N+1个人来了之后,只能先在餐馆外等待,等餐馆里有人离开之后才能进去用餐。Semaphore正是起这样一个限制作用。

Semaphore和ReentrantLock的功能有点类似,但ReentrantLock是只允许一个线程去访问资源,Semaphore是允许N个线程去访问资源。

Semaphore怎么用

Semaphore只有3个操作:

  1. 初始化
  2. 获取许可
  3. 释放许可

主要方法

  • 有两个构造方法Semaphore(int permits)Semaphore(int permits, boolean fair)
    参数permits设置最大许可数,fair表示是否公平竞争获取资源。

  • acquire()acquire(int permits)方法是获取许可,无参的是获取1,也就是AQS的state-1,也可以state-permits,计算的结果小于0则会阻塞线程。

  • tryAcquire()tryAcquire(int permits)会直接返回获取的结果(boolean值),不会阻塞线程。

  • tryAcquire(long timeout, TimeUnit unit)会尝试获取一段时间,如果超时都都没有获取到则返回失败。

  • release()、release(int permits)释放许可,释放后会唤醒其他等待线程。

演示代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.lzumetal.multithread.semaphore;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;


@Slf4j
public class SemaphoreTest {

private ExecutorService threadPool = Executors.newFixedThreadPool(10);


@Test
public void test() {
//信号量,只允许3个线程同时操作
Semaphore semaphore = new Semaphore(3);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < 6; i++) {
final Customer customer = new Customer(i + 1);
Future<?> future = threadPool.submit(() -> {
try {
semaphore.acquire();//会阻塞直到获取许可
log.info("{}进入了餐馆", customer);
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
log.info("{}离开了餐馆", customer);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); //释放
}
});
futureList.add(future);
}
for (Future future : futureList) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}


@Getter
@Setter
@ToString
class Customer {

private Integer id;

Customer(Integer id) {
this.id = id;
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
15:28:48.710 [pool-1-thread-3] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=3)进入了餐馆
15:28:48.710 [pool-1-thread-1] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=1)进入了餐馆
15:28:48.710 [pool-1-thread-2] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=2)进入了餐馆
15:28:50.714 [pool-1-thread-3] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=3)离开了餐馆
15:28:50.714 [pool-1-thread-4] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=4)进入了餐馆
15:28:51.713 [pool-1-thread-2] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=2)离开了餐馆
15:28:51.713 [pool-1-thread-1] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=1)离开了餐馆
15:28:51.713 [pool-1-thread-5] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=5)进入了餐馆
15:28:51.713 [pool-1-thread-6] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=6)进入了餐馆
15:28:51.714 [pool-1-thread-4] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=4)离开了餐馆
15:28:54.713 [pool-1-thread-6] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=6)离开了餐馆
15:28:55.714 [pool-1-thread-5] INFO com.lzumetal.multithread.semaphore.SemaphoreTest - SemaphoreTest.Customer(id=5)离开了餐馆

应用场景

Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这种情况,我们就可以使用Semaphore来做流控。

------ 本文完 ------