考えなおしたBlockingQueueの読込み
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import org.apache.log4j.Logger;
public abstract class QueCatcher<E> implements Runnable{
private boolean _active;
private BlockingQueue<E> _que;
private int readMax;
private long interruptCycle;
int count=0;
public enum QueCatchNext{
END,
CONTINUE,
INTERRUPT_AND_END,
INTERRUPT_AND_CONTINUE;
}
// コンストラクタでとりあえず、BlockingQueueを渡す
public QueCatcher(BlockingQueue<E> que){
this._que = que;
this.readMax = 8;
this.interruptCycle = 0;
}
public QueCatcher(BlockingQueue<E> que,int readMax){
this._que = que;
this.readMax = readMax;
this.interruptCycle = 0;
}
public QueCatcher(BlockingQueue<E> que,int readMax,long interruptCycle){
this._que = que;
this.readMax = readMax;
this.interruptCycle = interruptCycle;
}
public abstract QueCatchNext read(E e);
public abstract QueCatchNext interrupt(boolean isInterruptedExcept);
/*
* @see java.lang.Runnable#run()
*/
@Override
public void run(){
Logger logger = Logger.getLogger(this.getClass());
this._active = true;
this.readStart();
Timer timer = new Timer();
if (this.interruptCycle > 0){
timer.scheduleAtFixedRate(new _Handle(),this.interruptCycle,this.interruptCycle);
}
while(this._active){
try{
QueCatchNext next = this.read(this._que.take());
this.count++;
switch(next){
case CONTINUE:
if (this.readMax > 0 && this.count >= this.readMax){
this.count = 0;
this.callInterrupt(false);
}
break;
case INTERRUPT_AND_CONTINUE:
this.count = 0;
this.callInterrupt(false);
break;
case INTERRUPT_AND_END:
this._active = false;
this.callInterrupt(false);
break;
case END:
this._active = false;
break;
default:break;
}
}catch(InterruptedException e){
logger.trace("QueCatcher InterruptedException : "+e.getMessage(),e);
this.count = 0;
QueCatchNext next = this.callInterrupt(true);
switch(next){
case INTERRUPT_AND_END:
case END:
this._active = false;
break;
default:break;
}
}catch(Exception e){
logger.error("QueCatcher Exception : "+e.getMessage(),e);
}
}
timer.cancel();
this.readEnd();
}
class _Handle extends TimerTask{
@Override
public void run(){
if (QueCatcher.this.count > 0){
QueCatcher.this.count = 0;
QueCatcher.this.callInterrupt(false);
}
}
}
final synchronized QueCatchNext callInterrupt(boolean isInterrupted){
return this.interrupt(isInterrupted);
}
public final int getReadMax(){
return this.readMax;
}
// 必要に応じて、オーバーライド
public void readStart(){
}
public void readEnd(){
}
}