BlockingQueue とスレッド(1)

BlockingQueue と読込みスレッドの整理-1


import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import org.apache.log4j.Logger;
/**
 * キュー読込み抽象クラス.
 */

public abstract class QueCatcher<E> implements Runnable{
   private boolean _active;
   private BlockingQueue<E> _que;
   private int readMax;
   private long interruptCycle;
   int count=0;

   public QueCatcher(){
      this.readMax = 0;
      this.interruptCycle = 0;
   }
   public QueCatcher(int readMax){
      this.readMax = readMax;
      this.interruptCycle = 0;
   }
   public QueCatcher(int readMax,long interruptCycle){
      this.readMax = readMax;
      this.interruptCycle = interruptCycle;
   }
   public QueCatcher(BlockingQueue<E> bque){
      this._que = bque;
      this.readMax = 0;
      this.interruptCycle = 0;
   }
   public QueCatcher(BlockingQueue<E> bque,int readMax){
      this._que = bque;
      this.readMax = readMax;
      this.interruptCycle = 0;
   }
   public QueCatcher(BlockingQueue<E> bque,int readMax,long interruptCycle){
      this._que = bque;
      this.readMax = readMax;
      this.interruptCycle = interruptCycle;
   }

   /**
    * キュー読込み抽象メソッド.
    */

   public abstract QueCatchNext read(E e);

   /**
    * 割込み処理抽象メソッド.
    */

   public abstract QueCatchNext interrupt(boolean isInterruptedExcept);

   /*
    * @see java.lang.Runnable#run()
    */

   @Override
   public void run(){

      Logger logger = Logger.getLogger(this.getClass());
      this._active = true;
      this.readStart();
      Timer timer = new Timer();
      if (this.interruptCycle > 0){
         timer.scheduleAtFixedRate(new _Handle(),this.interruptCycle,this.interruptCycle);
      }
      while(this._active){
         try{
         QueCatchNext next = this.read(this._que.take());
         this.count++;
         switch(next){
            case CONTINUE:
               if (this.readMax > 0 && this.count >= this.readMax){
                  this.count = 0;
                  this.callInterrupt(false);
               }
               break;
            case INTERRUPT_AND_CONTINUE:
               this.count = 0;
               this.callInterrupt(false);
               break;
            case INTERRUPT_AND_END:
               this._active = false;
               this.callInterrupt(false);
               break;
            case END:
               this._active = false;
               break;
            default:break;
         }
         }catch(InterruptedException e){
            logger.trace("QueCatcher InterruptedException : "+e.getMessage(),e);
            this.count = 0;
            QueCatchNext next = this.callInterrupt(true);
            switch(next){
               case INTERRUPT_AND_END:
               case END:

                  this._active = false;
                  break;
               default:break;
            }
         }catch(Exception e){
            logger.error("QueCatcher Exception : "+e.getMessage(),e);
         }
      }
      timer.cancel();
      this.readEnd();
   }
   class _Handle extends TimerTask{
      @Override
      public void run(){
         if (QueCatcher.this.count > 0){
            QueCatcher.this.count = 0;
            QueCatcher.this.callInterrupt(false);
         }
      }
   }
   final synchronized QueCatchNext callInterrupt(boolean isInterrupted){
      return this.interrupt(isInterrupted);
   }

   /**
    * キュー関係付け
    */

   public final void setBlockingQueue(BlockingQueue<E> bque){
      this._que = bque;
   }
   public final int getReadMax(){
      return this.readMax;
   }
   public void readStart(){
   }
   public void readEnd(){
   }
}
----------------------------------
public enum QueCatchNext{
   /** キュー読込み終了                    */  END,
   /** 次のキュー読込み                    */  CONTINUE,
   /** interrupt()実行後に終了             */  INTERRUPT_AND_END,
   /** interrupt()実行後に次のキュー読込み */  INTERRUPT_AND_CONTINUE;
}