首页 > 编程知识 正文

信号 信号量,semaphore实现原理

时间:2023-05-05 12:55:08 阅读:50841 作者:1753

目录

1 )信号量模型

2 )信号量使用方法

3 )源代码分析

3.1级结构

3.2 acquire

3.3版本

4 )快速实现限流器

Semaphore现在被翻译成“信号量”,但以前也有被翻译成“信号灯”的时候。 就像现实中的红绿灯一样,所以车辆能否通行取决于是否是绿灯。 同样,在编程世界中,线程是否可以执行也取决于信号量是否允许。

信号量是著名的计算机科学家狄杰斯特拉(Dijkstra )在1965年提出的,在接下来的15年里,信号量一直是同时编程领域的终结者,直到1980年流水线被提出,我们才得到第二个选择目前,大多数支持并发编程的语言都支持信号量机制,因此需要掌握信号量

1 )信号量模型的信号量模型还很简单,可以很容易概括为一个计数器,一个等待队列,三个方法。 在信号模型中,计数器和等待队列对外部透明,因此只能通过信号模型提供的init (、down、up )这三种方法访问。 可以结合下图进行形象化理解。

这三种方法的详细含义如下。

init ) :设定计数器的初始值。 down () :从计数器的值中减去1; 如果计数器的值此时小于0,则当前线程将被阻止。 否则,当前线程可以继续运行。 up () :将计数器的值加1; 如果计数器的值此时小于或等于0,则唤醒等待队列中的线程,并将其从等待队列中删除。 这里提到的init ()、down ()、up () )三种方法都是原子性的,并且这种原子性由信号模型的实现方来保证。 在Java SDK中,信号模型由java.util.concurrent.Semaphore实现,类Semaphore确保这三个方法都是原子操作。

如果您觉得上面的说明有点绕圈子,请参考下面的编码信号量模型。

类semaphore {//计数器int count; //队列队列队列; //初始化操作semaphore(intc ) { this.count=c; } //void down () ) this.count----; if({ this.count0)//在队列中插入当前线程//阻止当前线程) } } void up ) () this.count; if(this.count=0) (/从等待队列中删除某个线程t ) /唤醒线程t ) )如果在此插入另一个线程,它将是信号福特模型中down、up )操作历史上最旧的另外,也有人喜欢用semWait (和semSignal )称呼它们。 称呼不同,但意思相同。 在Java SDK和建筑公司中,down () )和up ()是acquire () )、release ) )。

2 )在信号量如何累积的示例中,count=1操作是临界区域,只允许一个线程执行,即保证排他。 在那种情况下,用信号量怎么控制?

其实很简单。 就像使用独占锁一样,只需在进入临界区域之前执行down (操作,在退出临界区域之前执行up ) )操作即可。 以下是Java代码的示例: acquire ) )是信号内的下降),release ) )是信号内的上升)操作。

静态输入计数; //初始化信号量staticfinalsemaphores=new semaphore (1; //信号量保证排他静态Vo

id addOne() { s.acquire(); try { count+=1; } finally { s.release(); }}

下面我们再来分析一下,信号量是如何保证互斥的。假设两个线程T1和T2同时访问addOne()方法,当它们同时调用acquire()的时候,由于acquire()是一个原子操作,所以只能有一个线程(假设T1)把信号量里的计数器减为0,另外一个线程(T2)则是将计数器减为-1。对于线程T1,信号量里面的计数器的值是0,大于等于0,所以线程T1会继续执行;对于线程T2,信号量里面的计数器的值是-1,小于0,按照信号量模型里对down()操作的描述,线程T2将被阻塞。所以此时只有线程T1会进入临界区执行count+=1;。

当线程T1执行release()操作,也就是up()操作的时候,信号量里计数器的值是-1,加1之后的值是0,小于等于0,按照信号量模型里对up()操作的描述,此时等待队列中的T2将会被唤醒。于是T2在T1执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。

 

3)源码分析

 信号量 Semaphore 用于控制资源能够被并发访问的线程数量,以保证多个线程能够合理的使用特定资源,比如数据库连接等。

  Semaphore 在构造时设置一个许可数量,这个许可数量用 AQS.state来记录。

  acquire() 方法就是获取许可,只有获取到许可才可以继续执行访问共享资源,获取到许可之后 AQS.state 减1,以记录当前可用的许可数量;如果获取不到许可,线程就阻塞等待其他线程归还许可。  release() 方法将许可归还,AQS.state 加1;归还之后,唤醒 AQS 队列中阻塞的线程获取许可。 3.1 类结构

  Semaphore 同样是由 AQS 实现的,用内部类 Sync 来管理锁,Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁)。

  这个类结构有没有似曾相识的感觉,重入锁 ReentrantLock 也是同样的类结构,Semaphore 的源码跟 ReentrantLock 有很多相似但又比 ReentrantLock简单。

public class Semaphore implements java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {}}

看下构造方法,设置许可数 permits 其实就是将 AQS.state 设置为 permits:

public Semaphore(int permits) { sync = new NonfairSync(permits);}NonfairSync(int permits) { super(permits);}Sync(int permits) { setState(permits);} 3.2 acquire

  acquire() 方法就是获取许可,获取到许可就可以继续执行访问共享资源,获取不到就阻塞等待其他线程归还许可。

AQS.state 用来记录可用的许可数量,每获取一个许可 state 减1。

** * 获取许可的方法其实就是获取锁的方法 */public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 响应打断 if (Thread.interrupted()) throw new InterruptedException(); // 真正获取锁的方法,由Semaphore.NonfairSync实现 if (tryAcquireShared(arg) < 0) // 获取锁失败,当前线程阻塞并进入AQS同步队列 doAcquireSharedInterruptibly(arg); }/** * Semaphore.NonfairSync实现的获取锁的方法 */protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires);}/** * 每获取一个许可,将state-1,state表示剩余的许可数 * 如果许可已经用完,返回remaining<0,表示获取不到锁/许可,线程阻塞 * 如果还有许可,返回remaining>=0,表示获取到锁/许可,线程继续执行 */final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); // 每获取一个许可,将state-1,state表示剩余的许可数 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }} 3.3 release

  release() 方法归还许可,其实就是将 AQS.state 加1。归还成功,唤醒 AQS 队列中等锁的线程,从被阻塞的位置开始执行。

/** * 释放许可调用释放锁的方法 */public void release() { sync.releaseShared(1);}/** * 释放锁,完全成功,依次唤醒AQS队列中等待共享锁的线程 */public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 释放锁,由Semaphore.Sync实现 doReleaseShared(); // 释放锁成功,唤醒AQS队列中等锁的线程 return true; } return false;}/** * 每归还一个许可将state加1 */protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases;// 每归还一个许可将state加1 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; }}

4)快速实现一个限流器

上面的例子,我们用信号量实现了一个最简单的互斥锁功能。估计你会觉得奇怪,既然有Java SDK里面提供了Lock,为啥还要提供一个Semaphore ?其实实现一个互斥锁,仅仅是 Semaphore的部分功能,【Semaphore还有一个功能是Lock不容易实现的,那就是:Semaphore可以允许多个线程访问一个临界区】

现实中还有这种需求?有的。比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。

其实前不久,我在工作中也遇到了一个对象池的需求。所谓对象池呢,指的是一次性创建出N个对象,之后所有的线程重复利用这N个对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用List保存实例对象,这个很简单。但关键是限流器的设计,这里的限流,指的是不允许多于N个线程同时进入临界区。那如何快速实现一个这样的限流器呢?这种场景,我立刻就想到了信号量的解决方案。

信号量的计数器,在上面的例子中,我们设置成了1,这个1表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数N,就能完美解决对象池的限流问题了。下面就是对象池的示例代码。

class ObjPool { final List pool; // 用信号量实现限流器 final Semaphore sem; // 构造函数 ObjPool(int size, T t){ pool = new Vector(){}; for(int i=0; i func) { T t = null; sem.acquire(); try { t = pool.remove(0); return func.apply(t); } finally { pool.add(t); sem.release(); } }}// 创建对象池ObjPool pool = new ObjPool(10, 2);// 通过对象池获取t,之后执行 pool.exec(t -> { System.out.println(t); return t.toString();});

我们用一个List来保存对象实例,用Semaphore实现限流器。关键的代码是ObjPool里面的exec()方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用acquire()方法(与之匹配的是在finally里面调用release()方法),假设对象池的大小是10,信号量的计数器初始化为10,那么前10个线程调用acquire()方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程。

简言之,使用信号量,我们可以轻松地实现一个限流器,使用起来还是非常简单的。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。