安定な優先順位付きキュー
queueによるコマンドの受信と処理
小さなclient/serverシステムのserver側の設計/実装に関わることになりました。serverには複数のclientが接続され、各clientからのコマンドを受信し、コマンドに応じた処理を行なって結果をclientに投げ返します。
僕はこのserverアプリケーションを、2つのスレッドで実現しようと考えました。すなわち、clientからのコマンドを受信する R_thread と、コマンドの解析/処理/送信を行なう S_thread を起動します。
そしてこの2つのスレッドの間にコマンドを要素とするqueueを置くことで、スレッド間のコマンドの受け渡しを行なうことにしました。server実装のアウトラインは以下のようになります:
struct command {
// clientから受信したデータ...
};
std::queue<command> cmd_q; // コマンド・キュー
void R_thread() {
while ( true ) {
command cmd = clientから受信したコマンド;
cmd_q.push(cmd);
}
}
void S_thread() {
while ( true ) {
command cmd = cmd_q.front();
cmd_q.pop();
// cmd を処理し、結果を送信
}
}
int main() {
// R_thread を起動
// S_thread を起動
...
}
設計途中に新たな機能追加の要求が舞い込んできました。複数のclientそれぞれに優先順位を割り当て、優先順位の高いclientからのコマンドはより迅速に処理して欲しいというのです。
priority_queueは使えるか?
STLはpriority_queueという文字通り”優先順位付きqueue”を提供してくれていますから、機能追加に対応すべく早速上記コードに手を加えました:
struct command {
// clientから受信したデータ...
int priority; // 優先順位
};
bool operator<(const command& x, const command& y) {
return x.priority < y.priority;
}
std::priority_queue<command> cmd_q; // コマンド・キュー
... 以下同様 ...
priority_queueに要素をpush()するとpop()時にはpriorityの大きい順に取り出すことができます。ほんの数時間で作業は完了し、検査チームにリリースしました。
…甘かった。いきなり検査チームからクレームが飛び込んできました。
「clientから送信した複数のコマンドに対する処理結果が、与えたコマンドの順に返ってこない」というのです。
早速小さなテストコードを書き、そのふるまいを検証することにしました:
struct command {
char code;
int priority;
command(char c, int p) : code(c), priority(p) {}
};
inline bool operator<(const command& x, const command& y) {
return x.priority < y.priority;
}
ostream& operator<<(ostream& strm, const command& c) {
return strm << c.code << ':' << c.priority;
}
int main() {
int i;
cout << "input:" << endl;
for ( i = 0; i < 16; ++i ) cout << command(i/4+'A',i%4) << ' ';
cout << endl;
cout << "output:" << endl;
std::priority_queue<command> pq;
for ( i = 0; i < 16; ++i ) pq.push(command(i/4+'A',i%4));
while ( !pq.empty() ) {
cout << pq.top() << ' ';
pq.pop();
}
cout << endl;
return 0;
}
このコードの実行結果は以下のようになりました
input: A:0 A:1 A:2 A:3 B:0 B:1 B:2 B:3 C:0 C:1 C:2 C:3 D:0 D:1 D:2 D:3 output: A:3 C:3 B:3 D:3 D:2 B:2 C:2 A:2 A:1 B:1 C:1 D:1 D:0 C:0 B:0 A:0
なるほど優先順位の高いコマンドから順に出力されてはいますが、優先順位の等しいコマンドについてはその順番が狂っています。
こうなってほしかった…
A:3 B:3 C:3 D:3 A:2 B:2 C:2 D:2 A:1 B:1 C:1 D:1 A:0 B:0 C:0 D:0
stable_priority_queueの実装
そんなわけでSTLが提供するpriority_queueでは望みの動作が期待できないことが明らかになり、何らかの代替手段を考えなくてはならなくなりました。とはいえ、serverのメカニズム(スレッド間でのキューによるコマンドの受け渡し)を大幅に変更したくはありません。”安定”な優先順位付きキュー stable_priority_queue<T> を設計/実装し、priority_queue<T> と交換することにしました。
stable_priority_queue<T>はその内部に順序集合 c[0] .. c[N-1] を保持させます(N:要素数)。この順序集合は優先順位に基づいて昇順(低い順)に並べることとします。
要素 x を挿入するときは、c[i] >= x (i = 0..N-1) を満たす最初のiの位置、すなわち優先順位でソートされた順序集合内の適切な位置に x を割り込ませます。
要素を取り出すのはいたって簡単、順序集合の末尾を取り出せばいい。
これでできたも同然です。queue<T> や priority_queue<T> とインタフェースを統一した stable_priority_queue<T> を実装しましょう…
#include <algorithm>
#include <iterator>
#include <functional>
template <class T, class Container = std::deque<T>,
class Compare = std::less<Container::value_type> >
class stable_priority_queue {
public:
typedef Container::value_type value_type;
typedef Container::size_type size_type;
typedef Container container_type;
protected:
Container c;
Compare comp;
private:
template<class InputIterator>
void push(InputIterator first, InputIterator last)
{ for ( ; first != last; ++first ) push(*first); }
public:
explicit stable_priority_queue(const Compare& pp = Compare(),
const Container& cc = Container())
: comp(pp) { push(cc.begin(), cc.end()); }
template <class InputIterator>
stable_priority_queue(InputIterator first, InputIterator last,
const Compare& pp = Compare())
: comp(pp) { push(first,last); }
bool empty() const { return c.empty(); }
size_type size() const { return c.size(); }
const value_type& top() const { return c.back(); }
void pop() { c.pop_back(); }
void push(const value_type& x) {
Container::iterator it = c.begin();
while ( it != c.end() && comp(*it, x) ) ++it;
c.insert(it, x);
}
};
2分検索による高速化
これでとりあえず、目的とする stable_priority_queue<T> が作れました。
が、しかし、これには若干の不満が残ります。というのも、要素の挿入のとき、内包するコンテナの各要素は優先順位に基づいてソートされているのですから、2分検索(バイナリ・サーチ)を使えばより高速に挿入位置が求められます。
void push(const value_type& x) {
Container::iterator it = std::lower_bound(c.begin(), c.end(), x, comp);
c.insert(it, x);
}
一般に2分検索はコンテナ内の要素をランダム・アクセスするのでlist<T>のようなランダム・アクセスできないコンテナには適用できないはずなのですが、STLのアルゴリズム lower_bound はランダム・アクセスを要求しません。
stable_priority_queue<T>
#include <algorithm>
#include <iterator>
#include <functional>
template <class T, class Container = std::deque<T>,
class Compare = std::less<Container::value_type> >
class stable_priority_queue {
public:
typedef Container::value_type value_type;
typedef Container::size_type size_type;
typedef Container container_type;
protected:
Container c;
Compare comp;
private:
template<class InputIterator>
void push(InputIterator first, InputIterator last)
{ for ( ; first != last; ++first ) push(*first); }
public:
explicit stable_priority_queue(const Compare& pp = Compare(),
const Container& cc = Container())
: comp(pp) { push(cc.begin(), cc.end()); }
template <class InputIterator>
stable_priority_queue(InputIterator first, InputIterator last,
const Compare& pp = Compare())
: comp(pp) { push(first,last); }
bool empty() const { return c.empty(); }
size_type size() const { return c.size(); }
const value_type& top() const { return c.back(); }
void pop() { c.pop_back(); }
void push(const value_type& x) {
c.insert(std::lower_bound(c.begin(), c.end(), x, comp), x);
}
};
sample code (上記コードの直後に記述)
#include <iostream>
#include <deque>
#include <queue>
#include <list>
using namespace std;
struct command {
char code;
int priority;
command(char c, int p) : code(c), priority(p) {}
};
inline bool operator<(const command& x, const command& y) {
return x.priority < y.priority;
}
ostream& operator<<(ostream& strm, const command& c) {
return strm << c.code << ':' << c.priority;
}
int main() {
int i;
cout << "input sequence:" << endl;
for ( i = 0; i < 16; ++i ) cout << command(i/4+'A',i%4) << ' ';
cout << endl;
cout << "priority_queue(based on vector)" << endl;
std::priority_queue<command> pq;
for ( i = 0; i < 16; ++i ) pq.push(command(i/4+'A',i%4));
while ( !pq.empty() ) {
cout << pq.top() << ' ';
pq.pop();
}
cout << endl;
cout << "stable_priority_queue(using deque)" << endl;
stable_priority_queue<command> spqd;
for ( i = 0; i < 16; ++i ) spqd.push(command(i/4+'A',i%4));
while ( !spqd.empty() ) {
cout << spqd.top() << ' ';
spqd.pop();
}
cout << endl;
cout << "stable_priority_queue(using list)" << endl;
stable_priority_queue<command, list<command> > spql;
for ( i = 0; i < 16; ++i ) spql.push(command(i/4+'A',i%4));
while ( !spql.empty() ) {
cout << spql.top() << ' ';
spql.pop();
}
cout << endl;
return 0;
}
result
input sequence: A:0 A:1 A:2 A:3 B:0 B:1 B:2 B:3 C:0 C:1 C:2 C:3 D:0 D:1 D:2 D:3 priority_queue(based on vector) A:3 C:3 B:3 D:3 D:2 B:2 C:2 A:2 A:1 B:1 C:1 D:1 D:0 C:0 B:0 A:0 stable_priority_queue(using deque) A:3 B:3 C:3 D:3 A:2 B:2 C:2 D:2 A:1 B:1 C:1 D:1 A:0 B:0 C:0 D:0 stable_priority_queue(using list) A:3 B:3 C:3 D:3 A:2 B:2 C:2 D:2 A:1 B:1 C:1 D:1 A:0 B:0 C:0 D:0