スティルハウスの書庫の書庫

はてなダイアリーで書いてた「スティルハウスの書庫」を移転してきました。

#appengine MapReduceを使ってみた

Google I/O 2010で発表されたApp Engine新機能のひとつが「AppEngine-MapReduce(Mapper API)」です。このブログでもいろいろ憶測に憶測を重ねてきましたが、ついにApp EngineでMapReduceサポート!!…なんですが、しかし発表されたセッション会場もいまいち盛り上がりに欠ける感じ^^;; 理由は後述しますが、ともあれ(jw)、やっとのことApp EngineでもMapReduceがサポートされたわけです。

AppEngine-MapReduce(Mapper API)の概要

  • App Engine上でMapReduceスタイルの並列処理を実行するためのオープンソースのライブラリ(=App Engineの標準機能ではない)
    • Datastore上の大量のエンティティおよびBlobstoreの大きなファイルを自動分割、並列処理
    • Task QueueベースでMapReduce処理を再現。処理速度を自動制限
    • ジョブ管理ページを提供
    • Python版のみ。Java版は近日提供予定(very very soon)
  • Q&A
    • Hadoopとどっちが速い?」「HadoopGoogleインフラで動かないから分からない。そもそもこのツールは速さを目指していない」
    • 「TQの50タスク/秒制限が足かせにならないか?」「1タスクで30秒までCPUを使えるから、それほど問題はないと思う」
  • イマイチな点
    • C++やSawzallで使える「本物」のMapReduceはApp Engineに提供できない
      • 「App Engineは収容するアプリ数が膨大すぎる。それらで並行してMRを利用するとスケーリングや隔離性の問題が生じる」
      • 「リソース消費を制限する必要もある」
    • Hadoopとの互換性なし
    • まだ「Map」しかない。。「Reduce」部分は現在鋭意開発中とのこと
    • 遅いぞ(下記参照)

簡単さが取り柄!

ということで、本物MapReduceHadoopと比べるといま一歩感がぬぐえない概要であるAppEngine-MapReduceですが、なんと言っても取り柄は「簡単!」なことです。実際私も試しましたが、ドキュメントの指示に従って数行のPythonコードを書くだけで使えました。以下に流れを示します。

AppEngine-MapReduceライブラリをチェックアウト、app.yamlを設定

まずはApp Engine SDK 1.3.4の開発環境を用意して、AppEngine-MapReduceライブラリをチェックアウトして自分のアプリのフォルダ以下に置きます。

svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce

つづいてapp.yamlに以下のハンドラを追加します。これがMapReduceのジョブ管理ページとなります。

handlers:
- url: /mapreduce(/.*)?
  script: mapreduce/main.py
  login: admin
Mapperを定義する

次にMapperを定義します。このMapperは、指定したカインドのすべてのエンティティについて呼び出されますので、個々のエンティティについて実施したい任意の処理を記述します。今回私は、以前作成したTask Queueによる並列処理デモ用に用意した10万件のエンティティ(ランダムなアルファベットが入っている)を使い、その中から「ABCD」というキーワードを見つけるたびにXMPP経由で私のGTalkクライアントにメッセージを送るMapperを作ることにしました。こんな感じ:

from google.appengine.ext import db
from google.appengine.api import xmpp

def process(entity):
    if entity.docText.find("ABCD") != -1:
        msg = "found: " + str(entity.docId)
        xmpp.send_message("<my gtalk address>@gmail.com", msg)

class Doc(db.Model):
    docId = db.IntegerProperty()
    docText = db.StringProperty(multiline=True)

...Python超初心者なのでイマイチな部分があったらご指摘ください。このprocessメソッドが10万件あるDocエンティティの1個ずつ呼び出される仕組みです。このMapperを、mapreduce.yamlという設定ファイルにて登録しておき、アプリをクラウド環境にデプロイします。

mapreduce:
- name: ABCD finder 
  mapper:
    input_reader: mapreduce.input_readers.DatastoreInputReader
    handler: mapper.process 
    params:
    - name: entity_kind
      default: mapper.Doc 

たったこれだけです!10万件のデータをどんな単位で分割してどの程度のレートでタスクを投入するのか、すべてのタスクが正しく終了したか、ジョブの中断ははどうするか等は、すべてAppEngine-MapReduce側にお任せできるため、Task Queueで自前実装するときに比べてはるかに短いコードで済みます。EC2上でHadoopを使う場合に比べてもかなり敷居は低いのではないでしょうか。

ジョブ管理画面から実行/中断

「http://<appid>.appspot.com/mapreduce/」というURLをブラウザから開くと、MapReduceのジョブ管理画面が表示されます。ここから先ほど登録したMapperに対するジョブを起動したり中断できます。

また個々のジョブについて、シャード(分割単位)ごとの処理済みエンティティ数や、Mapper呼び出し回数等の状況を監視できます。

こんな感じでMapReduceジョブが動き始めると、"ABCD"という文字列が見つかるたびにGTalkクライアントにメッセージがぽつぽつ届きます。

使ってみて気づいた点

  • すごく簡単に使える
    • 上述のとおり、Python超初心者の私でも、数行書くだけでデプロイして使えました。この手軽さはすばらしいですね。
  • reduceがない
    • 単純にエンティティを削除するとかログ出力する等であれば問題ありませんが、例えばMapReduceで定番のワードカウント等を行うには自分で排他を考えて結果を集約する必要があります
  • ジョブ管理ツールは便利
    • これまでもcron用のジョブ管理ツールはほしいなあと思っていたので、このMapReduceに付属するツールはなかなかいい感じです。つまりは一晩中動かしておくcronと思って使えばいいのでしょうか。
  • 管理用カインドをDatastoreに作成する
    • このツールを動かすと、Datastore上には「MapreduceControl」、「MapreduceState」、「ShardState」というタスク制御用のカインドが自動的に作成されます。
  • Blobstore対応は便利そうだ
    • Blobstoreも最大2GBまでのファイルアップロードに対応しましたので、例えば巨大なログファイルをApp EngineにアップしてMapperで1行ごとに処理。。といった使い方が可能です。zipファイルを解凍して個々のファイルを処理する機能も用意されています。
  • 遅い
    • 10万件のエンティティを全検索するジョブを2回投入しましたが、いずれも16分かかりました。自作のTask Queueデモでは1分もかかりません。。「速さを求めてはいない」とはいえ、ちょっとマイペース過ぎる感があります。Hadoopとは比較になりませんね。。ちなみに処理速度は自動調整されるのですが、Task Queue上の該当するキュー(default)の最大レートは自動的に5/sに設定されます。
    • ちょっとおまじないを追加したところ、10万件エンティティを1分30秒で処理できました。まあまあの速さです。

どいう感じです。ちょっと一般的なMapReduceのイメージ(大規模並列処理!数100ノード!)とは違う感じではありますが、例えばスキーママイグレーションにともなう全エンティティのデータ修正や、大規模ログファイルの一括処理といった身近な用途で活躍するツールだと思います。