1 package cn.sp.test; 2 3 import java.util.concurrent.Semaphore; 4 5 /** 6 * 有界的缓存 7 * @author pc 8 * 9 * @param10 */11 public class BoundedBuffer {12 private final Semaphore availableItems;//可用项 信号量 13 private final Semaphore availableSpaces;//可用空间 信号量 14 private final E[] items;15 private int putPosition = 0,takePosition = 0;16 17 @SuppressWarnings("unchecked")18 public BoundedBuffer(int capacity){19 availableItems = new Semaphore(0);20 availableSpaces = new Semaphore(capacity);21 items = (E[]) new Object[capacity];22 }23 24 public boolean isEmpty(){ //可用许可为025 return availableItems.availablePermits() == 0;26 }27 28 public boolean isFull(){ //满时可用空间为029 return availableSpaces.availablePermits() == 0 ;30 }31 32 public void put(E x) throws InterruptedException{33 availableSpaces.acquire();34 doInsert(x);35 availableItems.release();36 }37 38 public E take() throws InterruptedException{39 availableItems.acquire();//从 availableItems 获取 一个许可40 E e = doExtract();41 availableSpaces.release();//返回 一个许可 到 availableSpaces42 return e;43 }44 45 private synchronized E doExtract() {46 int i = takePosition;47 E x = items[i];48 //取出后 调用垃圾收集器49 items[i] = null;50 takePosition = (++i == items.length)? 0 : i;51 return x;52 }53 54 private synchronized void doInsert(E x) { //同步方法55 int i = putPosition;56 items[i] = x;57 //如果满了重置为058 putPosition = (++i == items.length) ? 0 :i;59 60 }61 }
测试类:
1 package cn.sp.test; 2 3 import org.junit.Assert; 4 5 public class Test { 6 7 public void test01(){ 8 BoundedBufferboundedBuffer = new BoundedBuffer (10); 9 Assert.assertTrue(boundedBuffer.isEmpty());10 Assert.assertFalse(boundedBuffer.isFull());11 }12 @org.junit.Test13 public void test02() throws InterruptedException{14 BoundedBuffer boundedBuffer = new BoundedBuffer (10);15 16 for(int i=0;i<10;i++){17 boundedBuffer.put(i);18 }19 Assert.assertTrue(boundedBuffer.isFull());20 Assert.assertFalse(boundedBuffer.isEmpty());21 System.out.println(boundedBuffer.isFull());22 }23 }