Semaphore通过计数器来控制对一个共享资源的访问,它的工作机制是:当一个线程想要访问一个共享资源时,需要向Semaphore申请访问权限,如果Semaphore的计数器大于0,则线程获得共享资源的访问权限,此时Semaphore的计数器减一。当该线程使用完共享资源后,需要释放访问权限,此时Semaphore的计数器加一。如果一个线程在申请访问权限时Semaphore的计数器为0,则当前线程就会自动挂起,直到另一个线程释放一个访问权限后,当前线程获取访问权限并恢复运行。
Semaphore的用法比较简单,下面看一下它的构造方法和常用方法。
Semaphore有两个构造函数,第一个指定了计数器的个数,如果设置为1的话,则只有1个线程可以同时获取到共享资源。第二个重载的第二个参数如果设置为true,则先申请访问权限的线程会保证先得到访问权限。
Semaphore(int num) Semaphore(int num, boolean how)
void acquire( ) throws InterruptedException void acquire(int num) throws InterruptedException
acquire方法默认申请1个访问权限,也可以通过第二个重载来一次申请多个访问权限。
void release( ) void release(int num)
同acquire一样,release方法默认释放一个访问权限,也可以通过第二个重载版本来一次释放多个访问权限。
下面来看一个例子,假设有一个全局范围内的数字作为共享资源,有三个线程需要访问这个数字来做一个处理,每次处理的时间比较长,并且并发运行的话可能会影响最终的处理结果,所以要通过Semaphore来阻止并发。下面这个类作为共享资源
class Shared { public static int resources = 0; }
下面这个类实现了Runable接口,它表示一个需要访问共享资源的线程
class MyThread implements Runnable { String name; Semaphore semaphore; public MyThread(String name, Semaphore semaphore) { this.name = name; this.semaphore = semaphore; new Thread(this).start(); } @Override public void run() { System.out.println(name + " want a permit"); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " get a permit"); for (int i = 0; i < 5; i++) { Shared.resources++; System.out.println("shared resources now is " + Shared.resources + ", by " + name); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(name + "release a permit"); semaphore.release(); } }
这个线程先通过semaphore.acquire()
获取到访问权限,然后长时间占有共享资源,最终通过semaphore.release();
释放访问权限。
下面是main方法
public static void main(String[] args) { Semaphore semaphore = new Semaphore(1); new MyThread("thread a", semaphore); new MyThread("thread b", semaphore); new MyThread("thread c", semaphore); }
它首先创建了一个计数器为1的Semaphore,然后实例化了三个MyThread对象,该对象实例化后会自动运行线程。从输出结果可以看出,Semaphore有效地阻止了访问公共资源的代码的并发。下面是代码的输出
thread b want a permit thread a want a permit thread c want a permit thread b get a permit shared resources now is 1, by thread b shared resources now is 2, by thread b shared resources now is 3, by thread b shared resources now is 4, by thread b shared resources now is 5, by thread b thread b release a permit thread a get a permit shared resources now is 6, by thread a shared resources now is 7, by thread a shared resources now is 8, by thread a shared resources now is 9, by thread a shared resources now is 10, by thread a thread a release a permit thread c get a permit shared resources now is 11, by thread c shared resources now is 12, by thread c shared resources now is 13, by thread c shared resources now is 14, by thread c shared resources now is 15, by thread c thread c release a permit
除了并发控制以外,Semaphore还可以用来做线程间的通信。以生产者和消费者为例,假设生产者持有消费者的一个访问权限,导致消费者无法进行消费,当生产者完成生产后,释放这个访问权限,消费者就可以消费了,同时,消费者还可以持有一个生产者的访问权限,在消费完之后再释放,从而通知生产者继续生产。这有点类似wait和notify。下面来看如何通过Semaphore来实现这个场景。
首先来看一个队列
public class Queue { private Semaphore semaphoreConsumer = new Semaphore(0); private Semaphore semaphoreProducer = new Semaphore(1); private int value; public void put(int value) throws InterruptedException { semaphoreProducer.acquire(); this.value = value; System.out.println("---------- value is put " + this.value); semaphoreConsumer.release(); } public void get() throws InterruptedException { semaphoreConsumer.acquire(); System.out.println("********** value is get " + value); semaphoreProducer.release(); } }
这个类中包含两个Semaphore对象,一个用于控制生产者的生产,另一个用于控制消费者的消费,用于控制消费的Semaphore默认的计数器是0,表示消费者默认是不能消费的,只有等生产者release一个访问权限后,计数器会加一,此时消费者才可以拿到访问权限并开始消费。下面是生产者和消费者,他们内部只管不停地生产或消费即可
public class Producer implements Runnable { Queue queue; public Producer(Queue queue) { this.queue = queue; new Thread(this).start(); } @Override public void run() { for (int i = 0; i < 20; i++) { try { queue.put(i + 1); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Queue { private Semaphore semaphoreConsumer = new Semaphore(0); private Semaphore semaphoreProducer = new Semaphore(1); private int value; public void put(int value) throws InterruptedException { semaphoreProducer.acquire(); this.value = value; System.out.println("---------- value is put " + this.value); semaphoreConsumer.release(); } public void get() throws InterruptedException { semaphoreConsumer.acquire(); System.out.println("********** value is get " + value); semaphoreProducer.release(); } }
下面是main方法
public static void main(String[] args) { Queue queue = new Queue(); new Consumer(queue); new Producer(queue); }
下面是代码的输出
---------- value is put 1 ********** value is get 1 ---------- value is put 2 ********** value is get 2 ---------- value is put 3 ********** value is get 3 ......