Memcacheでスピンロックを実装してTask Queue処理結果を集約してみるテスト
TaskQueueで分散処理した結果をまとめるときは、排他を考慮する必要があります。Datastoreを使う場合なら、単に結果を新規エンティティとして追加したり、エンティティグループの楽観排他を使ったりすればOKです。一方、やっぱりMemcacheでスピーディーに集約したいよ、という場合は、Memcache上で排他を実装する必要があります。以前のエントリにちょろっと書いたMemcacheService#incrementでスピンロックという方法について説明してほしいというコメントをid:miztakaさんよりいただいたので、ここに改めて書きたいと思います。
Memcacheでは排他制御が必要
ご存じのとおり、App Engineはデフォルトで複数のApp Serverによるクラスタが構成されており、またMemcacheサービスはクラスタ全体で共有されるグローバルなキャッシュとして機能します。よって、TaskQueueのように複数のリクエストを大量に並列処理し、それらがMemcache上の1つのエントリに値を書き込む場合は、何らかの方法で排他制御を行わないと問題が生じます。
例えば単純なカウンターを実装する場合、
- Cache#get("counter")で現在の値を取得
- 1たす
- Cache#put("counter")で現在の値を書き込み
という一連の処理になります。RDBやDatastoreならこれ全体をトランザクションとすることで排他制御できますが、Memcacheにはそれがないのがやっかいです。2つのリクエストやタスクが上記コードをまったく同時に実行すると、どちらか一方のカウントが反映されず上書きされてしまいます。
MemcacheService#incrementでmutexを実装
ここで超役に立つ(というより唯一の頼みの綱)のが、MemcacheService#incrementメソッドです。MemcacheService#incrementは「キーで指定したMemcache上のLong値を読み取り、第2引数のLong値を加算し、Memcacheに書き込む」という操作をアトミックに実行します。よって、例えば2つのスレッドや2台のサーバーが、例えばincrement("counter", 1)をまったく同時に実行しても、それぞれの加算処理が必ず反映させる仕組みになっています(ちなみにこういうアトミックな演算ってたしかx86プロセッサの命令にもありましたよね)。
これを使って、以下のようなコードを書きました。ホントはコードをそのままコピペした方が分かりやすいのですが、そういうわけにも行かないので、大まかな流れだけ紹介します。
- acquireLockメソッド(引数key:ロックを識別する文字列。戻り値:trueならロック取得OK)
- MemcacheService#increment(key, 1)でlong値を取得
- 例外出たら0で初期化してfalseを返す
- l = 1ならtrueを返す
- l != 0ならMemcacheService#increment(key, -1)し、falseを返す
- MemcacheService#increment(key, 1)でlong値を取得
- releaseLockメソッド
- MemcacheService#increment(key, -1)
acquireLockメソッドは、任意のキーで名付けたロック(これってmutexに分類されるとおもいますたぶん)を取得してtrueを返します。誰かが同じキーのロックを取得済みならば、falseを返します。これを使って、以下のようなコードを書きます。
- aquireLock("counter_lock")でロック取得
- Cache#get("counter")で現在の値を取得
- 1たす
- Cache#put("counter")で現在の値を書き込み
- releaseLock("counter_lock")でロック取得
これで、上述の競合書き込みの問題はなくなります。
synchronized節っぽくして使いやすくする
とはいえ、このままではちょっと使いにくいです。
- ロック取得に失敗しても、何回かはリトライしたい
- ロック取得中に例外出たり、書き忘れたりしてreleaseLockを呼び忘れると永遠にロックされたままになりお客さまから怒られる
そこで、以下のようなメソッドを書きました。
- runSynchronizedメソッド(引数key:ロック識別用文字列、Runnable:ロック中に実行したいロジックを実装)
- acquireLockを呼び出し
- ロック取得に失敗したら、500msスリープして、リトライ。10回くらい失敗したらあきらめて例外をthrow
- Runnableのrunを呼び出し
- try/finallyで括って、finallyの中でreleaseLock呼び出し
- acquireLockを呼び出し
つまり、ロック取得に失敗しても、ループで10回くらいリトライするスピンロックです(OSカーネルの本物のスピンロックはスリープ挟まないと思うのでちょっと違いますが)。これを使うと、例えばカウンターの例は以下のように書けます。
runSynchronized("counter_lock", new Runnable() { public void run() { Cache#get("counter")で現在の値を取得 1たす Cache#put("counter")で現在の値を書き込み } });
こうして、めでたくMemcacheを複数タスクや複数リクエストで共有して書き込み可能になりました。いまのところ快調に動いているようです。