BlockingQueue 読込み

考えなおしたBlockingQueueの読込み

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import org.apache.log4j.Logger;

public abstract class QueCatcher<E> implements Runnable{
   private boolean _active;
   private BlockingQueue<E> _que;
   private int readMax;
   private long interruptCycle;
   int count=0;
   public enum QueCatchNext{
      END,
      CONTINUE,
      INTERRUPT_AND_END,
      INTERRUPT_AND_CONTINUE;
   }

   // コンストラクタでとりあえず、BlockingQueueを渡す
   public QueCatcher(BlockingQueue<E> que){
      this._que = que;
      this.readMax = 8;
      this.interruptCycle = 0;
   }
   public QueCatcher(BlockingQueue<E> que,int readMax){
      this._que = que;
      this.readMax = readMax;
      this.interruptCycle = 0;
   }
   public QueCatcher(BlockingQueue<E> que,int readMax,long interruptCycle){
      this._que = que;
      this.readMax = readMax;
      this.interruptCycle = interruptCycle;
   }

   public abstract QueCatchNext read(E e);

   public abstract QueCatchNext interrupt(boolean isInterruptedExcept);

   /*
    * @see java.lang.Runnable#run()
    */

   @Override
   public void run(){
      Logger logger = Logger.getLogger(this.getClass());
      this._active = true;
      this.readStart();
      Timer timer = new Timer();
      if (this.interruptCycle > 0){
         timer.scheduleAtFixedRate(new _Handle(),this.interruptCycle,this.interruptCycle);
      }
      while(this._active){
         try{
         QueCatchNext next = this.read(this._que.take());
         this.count++;
         switch(next){
            case CONTINUE:
               if (this.readMax > 0 && this.count >= this.readMax){
                  this.count = 0;
                  this.callInterrupt(false);
               }
               break;
            case INTERRUPT_AND_CONTINUE:
               this.count = 0;
               this.callInterrupt(false);
               break;
            case INTERRUPT_AND_END:
               this._active = false;
               this.callInterrupt(false);
               break;
            case END:
               this._active = false;
               break;
            default:break;
         }
         }catch(InterruptedException e){
            logger.trace("QueCatcher InterruptedException : "+e.getMessage(),e);
            this.count = 0;
            QueCatchNext next = this.callInterrupt(true);
            switch(next){
               case INTERRUPT_AND_END:
               case END:

                  this._active = false;
                  break;
               default:break;
            }
         }catch(Exception e){
            logger.error("QueCatcher Exception : "+e.getMessage(),e);
         }
      }
      timer.cancel();
      this.readEnd();
   }
   class _Handle extends TimerTask{
      @Override
      public void run(){
         if (QueCatcher.this.count > 0){
            QueCatcher.this.count = 0;
            QueCatcher.this.callInterrupt(false);
         }
      }
   }
   final synchronized QueCatchNext callInterrupt(boolean isInterrupted){
      return this.interrupt(isInterrupted);
   }

   public final int getReadMax(){
      return this.readMax;
   }
   // 必要に応じて、オーバーライド
   public void readStart(){
   }
   public void readEnd(){
   }
}