BlockingQueue とスレッド(2)

BlockingQueue と読込みスレッドを動作させるのに、以下のシングルトンを用意した方が良い。

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

/**
 * ブロッキングキュースレッド コントローラ(Singleton).
 */

public final class QueContoroller{
   private static QueContoroller inst;
   private Map<String,Thread>           thMap;  // key=Thread-Name  value=Thread
   private Map<String,List<String>>     qMap;   // key=qName        value=List Thread-Name
   private Map<String,BlockingQueue>    bMap;   // key=qName        value=BlockingQueue
   private Map<String,QueEndProvider>   eMap;   // key=qName        value=endProvider
   private QueContoroller(){
      this.thMap = new HashMap<String,Thread>();
      this.qMap = new HashMap<String,List<String>>();
      this.bMap = new HashMap<String,BlockingQueue>();
      this.eMap = new HashMap<String,QueEndProvider>();
   }
   /**
    * インスタンス取得
    * @return Singletonインスタンス
    */

   public final static synchronized QueContoroller getInstance(){
      if (inst==null) inst = new QueContoroller();
      return inst;
   }
   /**
    * キュースレッド開始
    * @param bque ブロッキングキュー
    * @param qName キュー名称、{キュー名}+ "-"+n の
    * @param endProvider キュー読込み終了させるためのキューデータファクトリ
    * @param qcatchers QueCatcher...キュー読込みスレッドインスタンス
    */

   @SuppressWarnings("unchecked")
   public void start(BlockingQueue bque
                        ,String qName
                        ,QueEndProvider endProvider
                        ,QueCatcher...qcatchers){

      if (this.qMap.get(qName) != null){
         throw new RuntimeException("already Thread que : "+qName);
      }
      this.bMap.put(qName,bque);
      this.eMap.put(qName,endProvider);
      this.qMap.put(qName,new ArrayList<String>());
      List<String> list = this.qMap.get(qName);
      String threadnames = new String[qcatchers.length];
      for(int i=1;i <= threadnames.length;i++){
         threadnames[i-1] = qName+"-"+i;
         list.add(threadnames[i-1]);
      }
      this.qMap.put(qName,list);
      int i=0;
      for(QueCatcher qcatcher : qcatchers){
         qcatcher.setBlockingQueue(bque);
         this.thMap.put(threadnames[i],new Thread(qcatcher));
         Thread th = this.thMap.get(threadnames[i]);
         th.setName(threadnames[i]);
         th.start();
         i++;
      }
   }
   /**
    * キュースレッド開始
    * @param bque ブロッキングキュー
    * @param qName キュー名称、{キュー名}+ "-"+n の
    * @param endProvider キュー読込み終了させるためのキューデータファクトリ
    * @param qcatchers キュー読込みスレッドインスタンスCollection<? extends QueCatcher>
    */

   @SuppressWarnings("unchecked")
   public void start(BlockingQueue bque
                        ,String qName
                        ,QueEndProvider endProvider
                        ,Collection<? extends QueCatcher> qcatchers){

      if (this.qMap.get(qName) != null){
         throw new RuntimeException("already Thread que : "+qName);
      }
      this.bMap.put(qName,bque);
      this.eMap.put(qName,endProvider);
      this.qMap.put(qName,new ArrayList<String>());
      List<String> list = this.qMap.get(qName);
      String
 threadnames = new String[qcatchers.size()];
      for(int i=1;i <= threadnames.length;i++){
         threadnames[i-1] = qName+"-"+i;
         list.add(threadnames[i-1]);
      }
      this.qMap.put(qName,list);
      int i=0;
      for(QueCatcher qcatcher : qcatchers){
         qcatcher.setBlockingQueue(bque);
         this.thMap.put(threadnames[i],new Thread(qcatcher));
         Thread th = this.thMap.get(threadnames[i]);
         th.setName(threadnames[i]);
         th.start();
         i++;
      }
   }
   /**
    * キューデータ蓄積サイズ
    * @param qName キュー名
    * @return 現在のキューデータ蓄積サイズ
    */

   public int size(String qName){
      BlockingQueue b = this.bMap.get(qName);
      return b==null ? 0 : b.size();
   }
   /**
    * キュースレッドActive状況
    * @param qName キュー名
    * @return Map<String,Boolean> key=キュー名、 value:true=Active
    */

   public Map<String,Boolean> queryActiveMap(String qName){
      Map<String,Boolean> map = new HashMap<String,Boolean>();
      List<String> list = this.qMap.get(qName);
      if (list==null) return map;
      for(String threadname : this.qMap.get(qName)){
         map.put(threadname,this.thMap.get(threadname).isAlive());
      }
      return map;
   }
   /**
    * キュースレッド名配列取得
    * @param qName キュー名
    * @return スレッド名配列
    */

   public String queryThreadNames(String qName){
      if (!this.qMap.containsKey(qName)) return new String[0];
      List<String> list = this.qMap.get(qName);
      String
 rtn = new String[list.size()];
      int i=0;
      for(String s : list){
         rtn[i] = s;
         i++;
      }
      return rtn;
   }
   /**
    * キュースレッドActive数取得
    * @param qName キュー名
    * @return Active数
    */

   public int queryActives(String qName){
      List<String> list = this.qMap.get(qName);
      if (list==null) return 0;
      int count=0;
      for(String threadname : this.qMap.get(qName)){
         if (this.thMap.get(threadname).isAlive()) count++;
      }
      return count;
   }
   /**
    * キュー終了待ち
    * @param qName キュー名
    * @param sec 待ち合わせ制限時間
    * @return Activeスレッド数
    */

   public int waitQueEnd(String qName,int sec){
      int rtn = 0;
      if (sec < 1){
         while(this.queryActives(qName) > 0){
            try{Thread.sleep(1000);}catch(InterruptedException e){}
         }
      }else{
         for(int i=0;i < sec;i++){
            rtn = this.queryActives(qName);
            if (rtn==0) break;
            try{Thread.sleep(1000);}catch(InterruptedException e){}
         }
      }
      return rtn;
   }
   /**
    * キュースレッド割込み
    * @param threadName スレッド名
    */

   public void interrupt(String threadName){
      if (!this.thMap.containsKey(threadName)){
         throw new RuntimeException("Not found Thread : "+threadName);
      }
      Thread th = this.thMap.get(threadName);
      if (th==null) return;
      if (!th.isAlive()){
         throw new RuntimeException("Thread is No Alive : "+threadName);
      }
      th.interrupt();
   }
   /**
    * 終了データ送信
    * @param qName キュー名
    */

   @SuppressWarnings("unchecked")
   public void end(String qName){
      QueEndProvider ep = this.eMap.get(qName);
      if (ep==null) return;
      List<String> list = this.qMap.get(qName);
      BlockingQueue bque = this.bMap.get(qName);
      for(int i=0;i < list.size();i++){
         try{
         bque.put(ep.endElement());
         }catch(InterruptedException e){
         }
      }
   }
   /**
    * 終了データ送信.(全てのキューに送信)
    */

   @SuppressWarnings("unchecked")
   public void allend(){
      for(String qName : this.eMap.keySet()){
         QueEndProvider ep = this.eMap.get(qName);
         if (ep==null) continue;
         List<String> list = this.qMap.get(qName);
         BlockingQueue bque = this.bMap.get(qName);
         for(int i=0;i < list.size();i++){
            try{
            bque.put(ep.endElement());
            }catch(InterruptedException e){
            }
         }
      }
   }
   /**
    * キューデータ送信
    * @param qName キュー名
    * @param e データ
    */

   @SuppressWarnings("unchecked")
   public <E> void put(String qName,E e){
      if (!this.bMap.containsKey(qName)){
         throw new RuntimeException("Not found que : "+qName);
      }
      try{this.bMap.get(qName).put(e);}catch(InterruptedException e1){}
   }
   public BlockingQueue getBlockinfQue(String qName){
      return this.bMap.get(qName);
   }
   public void remove(String qName){
      List<String> list = this.qMap.get(qName);
      if (list==null) return;
      for(String threadname : list){
         this.thMap.remove(threadname);
      }
      BlockingQueue b = this.bMap.get(qName);
      b.clear();
      b.remove();
      b = null;
      this.bMap.remove(qName);
      this.qMap.remove(qName);
      this.eMap.remove(qName);
   }
}
----------------------------------------------------
public interface QueEndProvider<E>{
   public  E endElement();
}