キュー操作というのは実際にコーディングして動かすと面白い。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* キュー受信スレッドを定義した ArrayBlockingQueue を持つ抽象クラス
* コンストラクタでキュー受信スレッドのインスタンスを指定し、
* ArrayBlockingQueue の生成と受信スレッド起動までを行う。
*/
public abstract class AppliedArrayBlockingQueue implements AppliedQue{
/** キュー. 継承先で BlockingQueue の操作を行うため */
public BlockingQueue<Message> messageQue;
/**
* コンストラクタ.
* @param quesize キュー容量固定サイズ
* @param readers キュー受信するスレッドインスタンス
* @param threadnames キュー受信するスレッド名を指定する配列で、並びは readersと同じ.
* threadnames を指定させる理由は、障害対応でスレッドダンプを解析する時の補助のため。
*/
public AppliedArrayBlockingQueue(int quesize,QueReader readers,String threadnames){
if (readers.length != threadnames.length){
throw new RuntimeException("QueReader.length and String threadnames.length not eqaule");
}
this.messageQue = new ArrayBlockingQueue<Message>(quesize);
// 受信スレッド起動
for(int i=0;i < readers.length;i++){
readers[i].init(this.messageQue);
Thread th = new Thread(readers[i]);
th.setName(threadnames[i]);
th.start();
}
}
}
でも、使いこなして面白さを理解してくれる人は少ない
==============
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* キュー読込み抽象クラス.
*
* Runnable#run() を既に実装している。
* run()の中の実行でコンストラクタで指定したキューの読出しを実行して
* 本クラス定義の public abstract boolean read(Message message)を実行する。
* →実装側で、読込んだ Message を受取り目的の処理を実行後、キューの受信を継続するかを booleanで
* 返す必要がある。
*
*/
public abstract class QueReader implements Runnable{
private boolean _active;
private BlockingQueue<Message> _que;
private long timer;
private TimeUnit unit;
/**
* コンストラクタ.
* <pre>
* デフォルトコンストラクタで生成した場合、キュー読込みのタイムアウト制御は行わない。
* 読込みのタイムアウト制御を行う場合は、QueReader(long timer,TimeUnit unit)で生成すること。
* </pre>
*/
public QueReader(){
this.timer = 0;
}
/**
* コンストラクタ(読込みタイマー付き).
*
* キュー読込みが、BlockingQueue#poll(long,TimeUnit) で実行されることが約束される。
* タイムアウト時は、 public abstract boolean read(Message message) の message に null が渡される。
*
* @param timer 読込み処理を中止するまでの待機時間。単位は unit、0 を指定した場合、タイムアウトなし
* @param unit TimeUnit
*/
public QueReader(long timer,TimeUnit unit){
this.timer = timer;
this.unit = unit;
}
/**
* キュー実装連携初期化.
*
* AppliedQue インターフェースを持つ以下の抽象クラスが、キューの実装と共に連携するようにするために
* protected メソッドである。
* AppliedArrayBlockingQueue : ArrayBlockingQueue に受信スレッドを紐付けさせるクラス
* AppliedLinkedBlockingQueue : LinkedBlockingQueue に受信スレッドを紐付けさせるクラス
* AppliedPriorityBlockingQueue : PriorityBlockingQueue に受信スレッドを紐付けさせるクラス*
*
* @param que BlockingQueue<Message>
*/
protected void init(BlockingQueue<Message> que){
this._que = que;
}
/**
* キュー読込み処理.
*
* キュー読込みを実行しているスレッドの中で、発生するException は、キャッチするが何も処理を
* しないので注意すること。
*
* @param message 読込まれたキューデータ
* @return true=キュー読込みを継続する。キュー読込みを継続させずにスレッドを終了させる。
*/
public abstract boolean read(Message message);
/**
* 停止を約束.
*
* ブロッキングキューであるので即停止ではなく、読込み後の継続をしないことを約束する。
*/
public void stopApply(){
this._active = false;
}
/**
* 継続性を問合せ
* @return true=キュー読込みを継続する。false=キュー読込み後スレッドを終了させる
*/
public boolean isActive(){
return this._active;
}
/* (非 Javadoc)
* @see java.lang.Runnable#run()
*/
public void run(){
this._active = true;
if (this.timer==0){
while(this._active){
try{
this._active = this.read(this._que.take());
}catch(Exception e){
//
}
}
}else{
while(this._active){
try{
this._active = this.read(this._que.poll(this.timer,this.unit));
}catch(Exception e){
//
}
}
}
}
}