株式会社エス・スリー・フォー

安定な優先順位付きキュー

queueによるコマンドの受信と処理

小さなclient/serverシステムのserver側の設計/実装に関わることになりました。serverには複数のclientが接続され、各clientからのコマンドを受信し、コマンドに応じた処理を行なって結果をclientに投げ返します。

僕はこのserverアプリケーションを、2つのスレッドで実現しようと考えました。すなわち、clientからのコマンドを受信する R_thread と、コマンドの解析/処理/送信を行なう S_thread を起動します。
そしてこの2つのスレッドの間にコマンドを要素とするqueueを置くことで、スレッド間のコマンドの受け渡しを行なうことにしました。server実装のアウトラインは以下のようになります:

struct command {
  // clientから受信したデータ...
};

std::queue<command> cmd_q; // コマンド・キュー

void R_thread() {
  while ( true ) {
    command cmd = clientから受信したコマンド;
    cmd_q.push(cmd);
  }
}

void S_thread() {
  while ( true ) {
    command cmd = cmd_q.front();
    cmd_q.pop();
    // cmd を処理し、結果を送信
  }
}

int main() {
  // R_thread を起動
  // S_thread を起動
  ...
}

設計途中に新たな機能追加の要求が舞い込んできました。複数のclientそれぞれに優先順位を割り当て、優先順位の高いclientからのコマンドはより迅速に処理して欲しいというのです。

priority_queueは使えるか?

STLはpriority_queueという文字通り”優先順位付きqueue”を提供してくれていますから、機能追加に対応すべく早速上記コードに手を加えました:

struct command {
  // clientから受信したデータ...
  int priority; // 優先順位
};

bool operator<(const command& x, const command& y) {
  return x.priority < y.priority;
}

std::priority_queue<command> cmd_q; // コマンド・キュー

... 以下同様 ...

priority_queueに要素をpush()するとpop()時にはpriorityの大きい順に取り出すことができます。ほんの数時間で作業は完了し、検査チームにリリースしました。

…甘かった。いきなり検査チームからクレームが飛び込んできました。
「clientから送信した複数のコマンドに対する処理結果が、与えたコマンドの順に返ってこない」というのです。

早速小さなテストコードを書き、そのふるまいを検証することにしました:

struct command {
  char code;
  int priority;
  command(char c, int p) : code(c), priority(p) {}
};

inline bool operator<(const command& x, const command& y) {
  return x.priority < y.priority;
}

ostream& operator<<(ostream& strm, const command& c) {
  return strm << c.code << ':' << c.priority;
}

int main() {

  int i;

  cout << "input:" << endl;
  for ( i = 0; i < 16; ++i ) cout << command(i/4+'A',i%4) << ' ';
  cout << endl;

  cout << "output:" << endl;
  std::priority_queue<command> pq;
  for ( i = 0; i < 16; ++i ) pq.push(command(i/4+'A',i%4));
  while ( !pq.empty() ) {
    cout << pq.top() << ' ';
    pq.pop();
  }
  cout << endl;

  return 0;
}

このコードの実行結果は以下のようになりました

input:
A:0 A:1 A:2 A:3 B:0 B:1 B:2 B:3 C:0 C:1 C:2 C:3 D:0 D:1 D:2 D:3
output:
A:3 C:3 B:3 D:3 D:2 B:2 C:2 A:2 A:1 B:1 C:1 D:1 D:0 C:0 B:0 A:0

なるほど優先順位の高いコマンドから順に出力されてはいますが、優先順位の等しいコマンドについてはその順番が狂っています。

こうなってほしかった…

A:3 B:3 C:3 D:3 A:2 B:2 C:2 D:2 A:1 B:1 C:1 D:1 A:0 B:0 C:0 D:0

stable_priority_queueの実装

そんなわけでSTLが提供するpriority_queueでは望みの動作が期待できないことが明らかになり、何らかの代替手段を考えなくてはならなくなりました。とはいえ、serverのメカニズム(スレッド間でのキューによるコマンドの受け渡し)を大幅に変更したくはありません。”安定”な優先順位付きキュー stable_priority_queue<T> を設計/実装し、priority_queue<T> と交換することにしました。

stable_priority_queue<T>はその内部に順序集合 c[0] .. c[N-1] を保持させます(N:要素数)。この順序集合は優先順位に基づいて昇順(低い順)に並べることとします。

要素 x を挿入するときは、c[i] >= x (i = 0..N-1) を満たす最初のiの位置、すなわち優先順位でソートされた順序集合内の適切な位置に x を割り込ませます。

要素を取り出すのはいたって簡単、順序集合の末尾を取り出せばいい。

これでできたも同然です。queue<T> や priority_queue<T> とインタフェースを統一した stable_priority_queue<T> を実装しましょう…

#include <algorithm>
#include <iterator>
#include <functional>

  template <class T, class Container = std::deque<T>,
            class Compare = std::less<Container::value_type> >
  class stable_priority_queue {
  public:
    typedef Container::value_type value_type;
    typedef Container::size_type  size_type;
    typedef Container             container_type;

  protected:
    Container c;
    Compare comp;

  private:
    template<class InputIterator>
      void push(InputIterator first, InputIterator last)
        { for ( ; first != last; ++first ) push(*first); }

  public:
    explicit stable_priority_queue(const Compare& pp = Compare(),
                                   const Container& cc = Container())
      : comp(pp) { push(cc.begin(), cc.end()); }

    template <class InputIterator>
      stable_priority_queue(InputIterator first, InputIterator last,
                            const Compare& pp = Compare())
      : comp(pp) { push(first,last); }

    bool              empty() const { return c.empty(); }
    size_type         size() const  { return c.size(); }
    const value_type& top() const   { return c.back(); }
    void              pop()         { c.pop_back(); }

    void push(const value_type& x) {
      Container::iterator it = c.begin();
      while ( it != c.end() && comp(*it, x) ) ++it;
      c.insert(it, x);
    }
  };

2分検索による高速化

これでとりあえず、目的とする stable_priority_queue<T> が作れました。

が、しかし、これには若干の不満が残ります。というのも、要素の挿入のとき、内包するコンテナの各要素は優先順位に基づいてソートされているのですから、2分検索(バイナリ・サーチ)を使えばより高速に挿入位置が求められます。

  void push(const value_type& x) {
      Container::iterator it = std::lower_bound(c.begin(), c.end(), x, comp);
      c.insert(it, x);
    }

一般に2分検索はコンテナ内の要素をランダム・アクセスするのでlist<T>のようなランダム・アクセスできないコンテナには適用できないはずなのですが、STLのアルゴリズム lower_bound はランダム・アクセスを要求しません。

stable_priority_queue<T>

#include <algorithm>
#include <iterator>
#include <functional>

  template <class T, class Container = std::deque<T>,
            class Compare = std::less<Container::value_type> >
  class stable_priority_queue {
  public:
    typedef Container::value_type value_type;
    typedef Container::size_type  size_type;
    typedef Container             container_type;

  protected:
    Container c;
    Compare comp;

  private:
    template<class InputIterator>
      void push(InputIterator first, InputIterator last)
        { for ( ; first != last; ++first ) push(*first); }

  public:
    explicit stable_priority_queue(const Compare& pp = Compare(),
                                   const Container& cc = Container())
      : comp(pp) { push(cc.begin(), cc.end()); }

    template <class InputIterator>
      stable_priority_queue(InputIterator first, InputIterator last,
                            const Compare& pp = Compare())
      : comp(pp) { push(first,last); }

    bool              empty() const { return c.empty(); }
    size_type         size()  const { return c.size(); }
    const value_type& top() const   { return c.back(); }
    void              pop()         { c.pop_back(); }

    void push(const value_type& x) {
      c.insert(std::lower_bound(c.begin(), c.end(), x, comp), x);
    }
  };

sample code (上記コードの直後に記述)

#include <iostream>
#include <deque>
#include <queue>
#include <list>

using namespace std;

struct command {
  char code;
  int priority;
  command(char c, int p) : code(c), priority(p) {}
};

inline bool operator<(const command& x, const command& y) {
  return x.priority < y.priority;
}

ostream& operator<<(ostream& strm, const command& c) {
  return strm << c.code << ':' << c.priority;
}

int main() {

  int i;

  cout << "input sequence:" << endl;
  for ( i = 0; i < 16; ++i ) cout << command(i/4+'A',i%4) << ' ';
  cout << endl;

  cout << "priority_queue(based on vector)" << endl;
  std::priority_queue<command> pq;
  for ( i = 0; i < 16; ++i ) pq.push(command(i/4+'A',i%4));
  while ( !pq.empty() ) {
    cout << pq.top() << ' ';
    pq.pop();
  }
  cout << endl;

  cout << "stable_priority_queue(using deque)" << endl;
  stable_priority_queue<command> spqd;
  for ( i = 0; i < 16; ++i ) spqd.push(command(i/4+'A',i%4));
  while ( !spqd.empty() ) {
    cout << spqd.top() << ' ';
    spqd.pop();
  }
  cout << endl;

  cout << "stable_priority_queue(using list)" << endl;
  stable_priority_queue<command, list<command> > spql;
  for ( i = 0; i < 16; ++i ) spql.push(command(i/4+'A',i%4));
  while ( !spql.empty() ) {
    cout << spql.top() << ' ';
    spql.pop();
  }
  cout << endl;

  return 0;
}

result

input sequence:
A:0 A:1 A:2 A:3 B:0 B:1 B:2 B:3 C:0 C:1 C:2 C:3 D:0 D:1 D:2 D:3
priority_queue(based on vector)
A:3 C:3 B:3 D:3 D:2 B:2 C:2 A:2 A:1 B:1 C:1 D:1 D:0 C:0 B:0 A:0
stable_priority_queue(using deque)
A:3 B:3 C:3 D:3 A:2 B:2 C:2 D:2 A:1 B:1 C:1 D:1 A:0 B:0 C:0 D:0
stable_priority_queue(using list)
A:3 B:3 C:3 D:3 A:2 B:2 C:2 D:2 A:1 B:1 C:1 D:1 A:0 B:0 C:0 D:0