使ってみると面白いQue

キュー操作というのは実際にコーディングして動かすと面白い。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * キュー受信スレッドを定義した ArrayBlockingQueue を持つ抽象クラス
 * コンストラクタでキュー受信スレッドのインスタンスを指定し、
 * ArrayBlockingQueue の生成と受信スレッド起動までを行う。
 */
public abstract class AppliedArrayBlockingQueue implements AppliedQue{
   /** キュー. 継承先で BlockingQueue の操作を行うため */
   public BlockingQueue<Message> messageQue;
   /**
    * コンストラクタ.
    * @param quesize キュー容量固定サイズ
    * @param readers キュー受信するスレッドインスタンス
    * @param threadnames キュー受信するスレッド名を指定する配列で、並びは readersと同じ.
    * threadnames を指定させる理由は、障害対応でスレッドダンプを解析する時の補助のため。
    */
   public AppliedArrayBlockingQueue(int quesize,QueReader readers,String threadnames){
      if (readers.length != threadnames.length){
         throw new RuntimeException("QueReader.length and String threadnames.length not eqaule");
      }
      this.messageQue = new ArrayBlockingQueue<Message>(quesize);
      // 受信スレッド起動
      for(int i=0;i < readers.length;i++){
         readers[i].init(this.messageQue);
         Thread th = new Thread(readers[i]);
         th.setName(threadnames[i]);
         th.start();
      }
   }
}

でも、使いこなして面白さを理解してくれる人は少ない
==============


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
 * キュー読込み抽象クラス.
 * 
 * Runnable#run() を既に実装している。
 * run()の中の実行でコンストラクタで指定したキューの読出しを実行して
 * 本クラス定義の public abstract boolean read(Message message)を実行する。
 * →実装側で、読込んだ Message を受取り目的の処理を実行後、キューの受信を継続するかを booleanで
 * 返す必要がある。
 * 
 */
public abstract class QueReader implements Runnable{
   private boolean _active;
   private BlockingQueue<Message> _que;
   private long timer;
   private TimeUnit unit;
   
   /**
    * コンストラクタ.
    * <pre>
    * デフォルトコンストラクタで生成した場合、キュー読込みのタイムアウト制御は行わない。
    * 読込みのタイムアウト制御を行う場合は、QueReader(long timer,TimeUnit unit)で生成すること。
    * </pre>
    */
   public QueReader(){
      this.timer = 0;
   }
   /**
    * コンストラクタ(読込みタイマー付き).
    * 
    * キュー読込みが、BlockingQueue#poll(long,TimeUnit) で実行されることが約束される。
    * タイムアウト時は、 public abstract boolean read(Message message) の message に null が渡される。
    * 
    * @param timer 読込み処理を中止するまでの待機時間。単位は unit、0 を指定した場合、タイムアウトなし
    * @param unit TimeUnit
    */
   public QueReader(long timer,TimeUnit unit){
      this.timer = timer;
      this.unit = unit;
   }
   
   /**
    * キュー実装連携初期化.
    * 
    * AppliedQue インターフェースを持つ以下の抽象クラスが、キューの実装と共に連携するようにするために
    * protected メソッドである。
    *    AppliedArrayBlockingQueue   : ArrayBlockingQueue    に受信スレッドを紐付けさせるクラス
    *    AppliedLinkedBlockingQueue   : LinkedBlockingQueue   に受信スレッドを紐付けさせるクラス
    *    AppliedPriorityBlockingQueue : PriorityBlockingQueue に受信スレッドを紐付けさせるクラス* 
    * 
    * @param que BlockingQueue<Message>
    */
   protected void init(BlockingQueue<Message> que){
      this._que = que;
   }

   /**
    * キュー読込み処理.
    * 
    * キュー読込みを実行しているスレッドの中で、発生するException は、キャッチするが何も処理を
  * しないので注意すること。
    * 
    * @param message 読込まれたキューデータ
    * @return true=キュー読込みを継続する。キュー読込みを継続させずにスレッドを終了させる。
    */
   public abstract boolean read(Message message);

   /**
    * 停止を約束.
    * 
    * ブロッキングキューであるので即停止ではなく、読込み後の継続をしないことを約束する。
    */
   public void stopApply(){
      this._active = false;
   }
   /**
    * 継続性を問合せ
    * @return true=キュー読込みを継続する。false=キュー読込み後スレッドを終了させる
    */
   public boolean isActive(){
      return this._active;
   }
   /* (非 Javadoc)
    * @see java.lang.Runnable#run()
    */
   public void run(){
      this._active = true;
      if (this.timer==0){
         while(this._active){
            try{
            this._active = this.read(this._que.take());
            }catch(Exception e){
               //
            }
         }
      }else{
         while(this._active){
            try{
            this._active = this.read(this._que.poll(this.timer,this.unit));
            }catch(Exception e){
                //
            }
         }
      }
   }
}