BlockingQueue と読込みスレッドを動作させるのに、以下のシングルトンを用意した方が良い。
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
/**
* ブロッキングキュースレッド コントローラ(Singleton).
*/
public final class QueContoroller{
private static QueContoroller inst;
private Map<String,Thread> thMap; // key=Thread-Name value=Thread
private Map<String,List<String>> qMap; // key=qName value=List Thread-Name
private Map<String,BlockingQueue> bMap; // key=qName value=BlockingQueue
private Map<String,QueEndProvider> eMap; // key=qName value=endProvider
private QueContoroller(){
this.thMap = new HashMap<String,Thread>();
this.qMap = new HashMap<String,List<String>>();
this.bMap = new HashMap<String,BlockingQueue>();
this.eMap = new HashMap<String,QueEndProvider>();
}
/**
* インスタンス取得
* @return Singletonインスタンス
*/
public final static synchronized QueContoroller getInstance(){
if (inst==null) inst = new QueContoroller();
return inst;
}
/**
* キュースレッド開始
* @param bque ブロッキングキュー
* @param qName キュー名称、{キュー名}+ "-"+n の
* @param endProvider キュー読込み終了させるためのキューデータファクトリ
* @param qcatchers QueCatcher...キュー読込みスレッドインスタンス
*/
@SuppressWarnings("unchecked")
public void start(BlockingQueue bque
,String qName
,QueEndProvider endProvider
,QueCatcher...qcatchers){
if (this.qMap.get(qName) != null){
throw new RuntimeException("already Thread que : "+qName);
}
this.bMap.put(qName,bque);
this.eMap.put(qName,endProvider);
this.qMap.put(qName,new ArrayList<String>());
List<String> list = this.qMap.get(qName);
String threadnames = new String[qcatchers.length];
for(int i=1;i <= threadnames.length;i++){
threadnames[i-1] = qName+"-"+i;
list.add(threadnames[i-1]);
}
this.qMap.put(qName,list);
int i=0;
for(QueCatcher qcatcher : qcatchers){
qcatcher.setBlockingQueue(bque);
this.thMap.put(threadnames[i],new Thread(qcatcher));
Thread th = this.thMap.get(threadnames[i]);
th.setName(threadnames[i]);
th.start();
i++;
}
}
/**
* キュースレッド開始
* @param bque ブロッキングキュー
* @param qName キュー名称、{キュー名}+ "-"+n の
* @param endProvider キュー読込み終了させるためのキューデータファクトリ
* @param qcatchers キュー読込みスレッドインスタンスCollection<? extends QueCatcher>
*/
@SuppressWarnings("unchecked")
public void start(BlockingQueue bque
,String qName
,QueEndProvider endProvider
,Collection<? extends QueCatcher> qcatchers){
if (this.qMap.get(qName) != null){
throw new RuntimeException("already Thread que : "+qName);
}
this.bMap.put(qName,bque);
this.eMap.put(qName,endProvider);
this.qMap.put(qName,new ArrayList<String>());
List<String> list = this.qMap.get(qName);
String threadnames = new String[qcatchers.size()];
for(int i=1;i <= threadnames.length;i++){
threadnames[i-1] = qName+"-"+i;
list.add(threadnames[i-1]);
}
this.qMap.put(qName,list);
int i=0;
for(QueCatcher qcatcher : qcatchers){
qcatcher.setBlockingQueue(bque);
this.thMap.put(threadnames[i],new Thread(qcatcher));
Thread th = this.thMap.get(threadnames[i]);
th.setName(threadnames[i]);
th.start();
i++;
}
}
/**
* キューデータ蓄積サイズ
* @param qName キュー名
* @return 現在のキューデータ蓄積サイズ
*/
public int size(String qName){
BlockingQueue b = this.bMap.get(qName);
return b==null ? 0 : b.size();
}
/**
* キュースレッドActive状況
* @param qName キュー名
* @return Map<String,Boolean> key=キュー名、 value:true=Active
*/
public Map<String,Boolean> queryActiveMap(String qName){
Map<String,Boolean> map = new HashMap<String,Boolean>();
List<String> list = this.qMap.get(qName);
if (list==null) return map;
for(String threadname : this.qMap.get(qName)){
map.put(threadname,this.thMap.get(threadname).isAlive());
}
return map;
}
/**
* キュースレッド名配列取得
* @param qName キュー名
* @return スレッド名配列
*/
public String queryThreadNames(String qName){
if (!this.qMap.containsKey(qName)) return new String[0];
List<String> list = this.qMap.get(qName);
String rtn = new String[list.size()];
int i=0;
for(String s : list){
rtn[i] = s;
i++;
}
return rtn;
}
/**
* キュースレッドActive数取得
* @param qName キュー名
* @return Active数
*/
public int queryActives(String qName){
List<String> list = this.qMap.get(qName);
if (list==null) return 0;
int count=0;
for(String threadname : this.qMap.get(qName)){
if (this.thMap.get(threadname).isAlive()) count++;
}
return count;
}
/**
* キュー終了待ち
* @param qName キュー名
* @param sec 待ち合わせ制限時間
* @return Activeスレッド数
*/
public int waitQueEnd(String qName,int sec){
int rtn = 0;
if (sec < 1){
while(this.queryActives(qName) > 0){
try{Thread.sleep(1000);}catch(InterruptedException e){}
}
}else{
for(int i=0;i < sec;i++){
rtn = this.queryActives(qName);
if (rtn==0) break;
try{Thread.sleep(1000);}catch(InterruptedException e){}
}
}
return rtn;
}
/**
* キュースレッド割込み
* @param threadName スレッド名
*/
public void interrupt(String threadName){
if (!this.thMap.containsKey(threadName)){
throw new RuntimeException("Not found Thread : "+threadName);
}
Thread th = this.thMap.get(threadName);
if (th==null) return;
if (!th.isAlive()){
throw new RuntimeException("Thread is No Alive : "+threadName);
}
th.interrupt();
}
/**
* 終了データ送信
* @param qName キュー名
*/
@SuppressWarnings("unchecked")
public void end(String qName){
QueEndProvider ep = this.eMap.get(qName);
if (ep==null) return;
List<String> list = this.qMap.get(qName);
BlockingQueue bque = this.bMap.get(qName);
for(int i=0;i < list.size();i++){
try{
bque.put(ep.endElement());
}catch(InterruptedException e){
}
}
}
/**
* 終了データ送信.(全てのキューに送信)
*/
@SuppressWarnings("unchecked")
public void allend(){
for(String qName : this.eMap.keySet()){
QueEndProvider ep = this.eMap.get(qName);
if (ep==null) continue;
List<String> list = this.qMap.get(qName);
BlockingQueue bque = this.bMap.get(qName);
for(int i=0;i < list.size();i++){
try{
bque.put(ep.endElement());
}catch(InterruptedException e){
}
}
}
}
/**
* キューデータ送信
* @param qName キュー名
* @param e データ
*/
@SuppressWarnings("unchecked")
public <E> void put(String qName,E e){
if (!this.bMap.containsKey(qName)){
throw new RuntimeException("Not found que : "+qName);
}
try{this.bMap.get(qName).put(e);}catch(InterruptedException e1){}
}
public BlockingQueue getBlockinfQue(String qName){
return this.bMap.get(qName);
}
public void remove(String qName){
List<String> list = this.qMap.get(qName);
if (list==null) return;
for(String threadname : list){
this.thMap.remove(threadname);
}
BlockingQueue b = this.bMap.get(qName);
b.clear();
b.remove();
b = null;
this.bMap.remove(qName);
this.qMap.remove(qName);
this.eMap.remove(qName);
}
}
----------------------------------------------------
public interface QueEndProvider<E>{
public E endElement();
}