- 追加された行はこの色です。
- 削除された行はこの色です。
*目次 [#if9dfb7f]
#contents
*概要 [#qb4d52ca]
複数クライアントから、リクエストをキューに蓄えつつ
スレッドの数を制限しながら実行する処理のパターン
*コード [#a096b98b]
元のデザインパターンの名前が気に入らなかったので、修正したのと
Two-Phase Terminationも、いるだろうから、組み込んでみた。
パターンを2つ組み合わせたパターンにした。
private static ${enclosing_type} instance;
private static ${enclosing_type} getInstance() {
if (instance == null) {
instance = new ${enclosing_type}();
}
return instance;
}
public static void main(String[] args) {
int threadSize = 4;// スレッド数
int requestQueueMaxSize = 20;// キューリクエスト最大数
Workers workers = getInstance().new Workers(threadSize,
requestQueueMaxSize);
workers.startWork();
// 複数クライアントからのリクエストでも対応できるようになっている。
getInstance().new ClientThread("Yahoo", workers).start();
getInstance().new ClientThread("Google", workers).start();
getInstance().new ClientThread("Oracle", workers).start();
}
/**
* 依頼元のスレッドでこのクラスの内部でリクエストを行っている リクエストをキューに格納する箇所を修正する必要がある。
*
*/
class ClientThread extends ShutdownableThread {
private final Workers requestQueue;
public ClientThread(String clientName, Workers requestQueue) {
super(clientName);
this.requestQueue = requestQueue;
}
/**
* リクエストの登録と実行 無限ループのサンプルになっている
*/
public void run() {
try {
// リクエストの登録と実行
for (int i = 0; true; i++) {
Request request = new Request(getName(), i);
this.requestQueue.offer(request);
this.doHeavyJob();// ダミーの処理 ただのwait
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
doShutdown();
}
}
/**
* ダミーの処理
* 単なるwait クライアントの操作の揺らぎを表現
*
* @throws InterruptedException
*/
private void doHeavyJob() throws InterruptedException {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
}
@Override
void doShutdown() {
System.out.println("ClientThread end " + getName());
}
}
/**
* リクエストを保持しておくキュー このクラスが実際に処理させるスレッドを配列として格納しています。
*/
class Workers {
// リクエストを保持しておくキュー
ArrayBlockingQueue<Request> requestQueue = null;
// Worker Thread の保持
private final List<WorkerThread> workerThreadPoolList;
/**
* コンストラクター
*
* @param threadMaxSize
*/
public Workers(int threadMaxSize, int queueMaxSize) {
requestQueue = new ArrayBlockingQueue<Request>(queueMaxSize);
this.workerThreadPoolList = new ArrayList<WorkerThread>();
for (int i = 0; i < threadMaxSize; i++) {
this.workerThreadPoolList.add(new WorkerThread("Worker-" + i,
this));
}
}
/**
* 作業スレッドを開始させておきます。
*/
public void startWork() {
for (WorkerThread workerThread : this.workerThreadPoolList) {
workerThread.start();
}
}
/**
* リクエストキューにある限り待ち続け、リクエスト保持可能であれば保持する。
*
* @param request
*/
public synchronized void offer(Request request) {
while (this.requestQueue.size() > 0) {
// キューに登録があれば待ち続ける
try {
wait();
} catch (InterruptedException e) {
}
}
this.requestQueue.offer(request);
notifyAll();
}
/**
* リクエストを取得する
*
* @return
*/
public synchronized Request poll() {
// キューにリクエストが無い間は待ち続ける
while (this.requestQueue.size() <= 0) {
try {
wait();
} catch (InterruptedException e) {
}
}
Request request = this.requestQueue.poll();
notifyAll();
return request;
}
}
/**
* リクエストキューから、リクエストを取り出し実行
*
*/
class WorkerThread extends ShutdownableThread {
private final Workers requestQueue;
public WorkerThread(String name, Workers requestQueue) {
super(name);
this.requestQueue = requestQueue;
}
@Override
public void run() {
try {
while (!isShutdown()) {
doRun();
}
} catch (InterruptedException e) {
} finally {
doShutdown();
}
}
void doRun() throws InterruptedException {
Request request = this.requestQueue.poll();
request.execute();
}
@Override
/**
* シャットダウン時または、終了時に実行する処理
*/
void doShutdown() {
//
}
}
/**
* リクエストのパラメータと実行を担当するクラス
*/
class Request {
// 依頼者
private final String companyName;
// リクエストの番号
private final int requestNumber;
public Request(String companyName, int requestNumber) {
this.companyName = companyName;
this.requestNumber = requestNumber;
}
/**
* Workerクラスから実行される。
*/
public void execute() {
System.out.println(Thread.currentThread().getName() + " executes "
+ this);
try {
this.doHeavyJob();// ダミーの処理
} catch (InterruptedException e) {
}
}
public String toString() {
return "[ Request from " + this.companyName + " No."
+ this.requestNumber + " ]";
}
/**
* ダミーの処理が入っているが、 本来やるべき処理に差し替える。
*
* @throws InterruptedException
*/
private void doHeavyJob() throws InterruptedException {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
}
}
/**
* Two-Phase TerminationをAbstructにしたクラス
* 元々スレッドにはinterruptメソッドがあるが、名称をshutdownにすると
*
* runの代わりにrunメソッドはこのクラスで処理するので、doRunを実装して使う。
*
* シャットダウン例
* t.shutdown();
* t.join();
*/
abstract class ShutdownableThread extends Thread {
private volatile boolean shutdownFlg = false;
public ShutdownableThread(String name) {
super(name);
}
// 終了要求
public void shutdown() {
shutdownFlg = true;
interrupt();
}
// 終了要求が出されたかどうか?
protected boolean isShutdown() {
return shutdownFlg;
}
/**
* シャットダウン時の処理
*/
abstract void doShutdown();
}