java多线程[5]:信号量(Semaphore)

Semaphore通过计数器来控制对一个共享资源的访问,它的工作机制是:当一个线程想要访问一个共享资源时,需要向Semaphore申请访问权限,如果Semaphore的计数器大于0,则线程获得共享资源的访问权限,此时Semaphore的计数器减一。当该线程使用完共享资源后,需要释放访问权限,此时Semaphore的计数器加一。如果一个线程在申请访问权限时Semaphore的计数器为0,则当前线程就会自动挂起,直到另一个线程释放一个访问权限后,当前线程获取访问权限并恢复运行。

Semaphore的用法比较简单,下面看一下它的构造方法和常用方法。

构造方法

Semaphore有两个构造函数,第一个指定了计数器的个数,如果设置为1的话,则只有1个线程可以同时获取到共享资源。第二个重载的第二个参数如果设置为true,则先申请访问权限的线程会保证先得到访问权限。

Semaphore(int num) Semaphore(int num, boolean how)

申请权限

void acquire( ) throws InterruptedException void acquire(int num) throws InterruptedException

acquire方法默认申请1个访问权限,也可以通过第二个重载来一次申请多个访问权限。

释放权限

void release( ) void release(int num)

同acquire一样,release方法默认释放一个访问权限,也可以通过第二个重载版本来一次释放多个访问权限。

下面来看一个例子,假设有一个全局范围内的数字作为共享资源,有三个线程需要访问这个数字来做一个处理,每次处理的时间比较长,并且并发运行的话可能会影响最终的处理结果,所以要通过Semaphore来阻止并发。下面这个类作为共享资源

class Shared {     public static int resources = 0; }

下面这个类实现了Runable接口,它表示一个需要访问共享资源的线程

class MyThread implements Runnable {      String name;     Semaphore semaphore;      public MyThread(String name, Semaphore semaphore) {         this.name = name;         this.semaphore = semaphore;         new Thread(this).start();     }      @Override     public void run() {         System.out.println(name + " want a permit");         try {             semaphore.acquire();         } catch (InterruptedException e) {             e.printStackTrace();         }         System.out.println(name + " get a permit");         for (int i = 0; i < 5; i++) {             Shared.resources++;             System.out.println("shared resources now is " + Shared.resources + ", by " + name);             try {                 Thread.sleep(500);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         System.out.println(name + "release a permit");         semaphore.release();     } }

这个线程先通过semaphore.acquire()获取到访问权限,然后长时间占有共享资源,最终通过semaphore.release();释放访问权限。

下面是main方法

public static void main(String[] args) {     Semaphore semaphore = new Semaphore(1);     new MyThread("thread a", semaphore);     new MyThread("thread b", semaphore);     new MyThread("thread c", semaphore);  }

它首先创建了一个计数器为1的Semaphore,然后实例化了三个MyThread对象,该对象实例化后会自动运行线程。从输出结果可以看出,Semaphore有效地阻止了访问公共资源的代码的并发。下面是代码的输出

thread b want a permit thread a want a permit thread c want a permit thread b get a permit shared resources now is 1, by thread b shared resources now is 2, by thread b shared resources now is 3, by thread b shared resources now is 4, by thread b shared resources now is 5, by thread b thread b release a permit thread a get a permit shared resources now is 6, by thread a shared resources now is 7, by thread a shared resources now is 8, by thread a shared resources now is 9, by thread a shared resources now is 10, by thread a thread a release a permit thread c get a permit shared resources now is 11, by thread c shared resources now is 12, by thread c shared resources now is 13, by thread c shared resources now is 14, by thread c shared resources now is 15, by thread c thread c release a permit

除了并发控制以外,Semaphore还可以用来做线程间的通信。以生产者和消费者为例,假设生产者持有消费者的一个访问权限,导致消费者无法进行消费,当生产者完成生产后,释放这个访问权限,消费者就可以消费了,同时,消费者还可以持有一个生产者的访问权限,在消费完之后再释放,从而通知生产者继续生产。这有点类似wait和notify。下面来看如何通过Semaphore来实现这个场景。

首先来看一个队列

public class Queue {     private Semaphore semaphoreConsumer = new Semaphore(0);     private Semaphore semaphoreProducer = new Semaphore(1);     private int value;     public void put(int value) throws InterruptedException {         semaphoreProducer.acquire();         this.value = value;         System.out.println("----------  value is put " + this.value);         semaphoreConsumer.release();     }      public void get() throws InterruptedException {         semaphoreConsumer.acquire();         System.out.println("**********  value is get " + value);         semaphoreProducer.release();     } }

这个类中包含两个Semaphore对象,一个用于控制生产者的生产,另一个用于控制消费者的消费,用于控制消费的Semaphore默认的计数器是0,表示消费者默认是不能消费的,只有等生产者release一个访问权限后,计数器会加一,此时消费者才可以拿到访问权限并开始消费。下面是生产者和消费者,他们内部只管不停地生产或消费即可

public class Producer implements Runnable {     Queue queue;     public Producer(Queue queue) {         this.queue = queue;         new Thread(this).start();     }      @Override     public void run() {         for (int i = 0; i < 20; i++) {             try {                 queue.put(i + 1);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }     } }
public class Queue {     private Semaphore semaphoreConsumer = new Semaphore(0);     private Semaphore semaphoreProducer = new Semaphore(1);     private int value;     public void put(int value) throws InterruptedException {         semaphoreProducer.acquire();         this.value = value;         System.out.println("----------  value is put " + this.value);         semaphoreConsumer.release();     }      public void get() throws InterruptedException {         semaphoreConsumer.acquire();         System.out.println("**********  value is get " + value);         semaphoreProducer.release();     } }

下面是main方法

public static void main(String[] args) {     Queue queue = new Queue();     new Consumer(queue);     new Producer(queue); }

下面是代码的输出

----------  value is put 1 **********  value is get 1 ----------  value is put 2 **********  value is get 2 ----------  value is put 3 **********  value is get 3 ......