#appengine MapReduceを使ってみた
Google I/O 2010で発表されたApp Engine新機能のひとつが「AppEngine-MapReduce(Mapper API)」です。このブログでもいろいろ憶測に憶測を重ねてきましたが、ついにApp EngineでMapReduceサポート!!…なんですが、しかし発表されたセッション会場もいまいち盛り上がりに欠ける感じ^^;; 理由は後述しますが、ともあれ(jw)、やっとのことApp EngineでもMapReduceがサポートされたわけです。
リンク集
AppEngine-MapReduce(Mapper API)の概要
- Q&A
簡単さが取り柄!
ということで、本物MapReduceやHadoopと比べるといま一歩感がぬぐえない概要であるAppEngine-MapReduceですが、なんと言っても取り柄は「簡単!」なことです。実際私も試しましたが、ドキュメントの指示に従って数行のPythonコードを書くだけで使えました。以下に流れを示します。
AppEngine-MapReduceライブラリをチェックアウト、app.yamlを設定
まずはApp Engine SDK 1.3.4の開発環境を用意して、AppEngine-MapReduceライブラリをチェックアウトして自分のアプリのフォルダ以下に置きます。
svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce
つづいてapp.yamlに以下のハンドラを追加します。これがMapReduceのジョブ管理ページとなります。
handlers: - url: /mapreduce(/.*)? script: mapreduce/main.py login: admin
Mapperを定義する
次にMapperを定義します。このMapperは、指定したカインドのすべてのエンティティについて呼び出されますので、個々のエンティティについて実施したい任意の処理を記述します。今回私は、以前作成したTask Queueによる並列処理デモ用に用意した10万件のエンティティ(ランダムなアルファベットが入っている)を使い、その中から「ABCD」というキーワードを見つけるたびにXMPP経由で私のGTalkクライアントにメッセージを送るMapperを作ることにしました。こんな感じ:
from google.appengine.ext import db from google.appengine.api import xmpp def process(entity): if entity.docText.find("ABCD") != -1: msg = "found: " + str(entity.docId) xmpp.send_message("<my gtalk address>@gmail.com", msg) class Doc(db.Model): docId = db.IntegerProperty() docText = db.StringProperty(multiline=True)
...Python超初心者なのでイマイチな部分があったらご指摘ください。このprocessメソッドが10万件あるDocエンティティの1個ずつ呼び出される仕組みです。このMapperを、mapreduce.yamlという設定ファイルにて登録しておき、アプリをクラウド環境にデプロイします。
mapreduce: - name: ABCD finder mapper: input_reader: mapreduce.input_readers.DatastoreInputReader handler: mapper.process params: - name: entity_kind default: mapper.Doc
たったこれだけです!10万件のデータをどんな単位で分割してどの程度のレートでタスクを投入するのか、すべてのタスクが正しく終了したか、ジョブの中断ははどうするか等は、すべてAppEngine-MapReduce側にお任せできるため、Task Queueで自前実装するときに比べてはるかに短いコードで済みます。EC2上でHadoopを使う場合に比べてもかなり敷居は低いのではないでしょうか。
使ってみて気づいた点
- すごく簡単に使える
- 上述のとおり、Python超初心者の私でも、数行書くだけでデプロイして使えました。この手軽さはすばらしいですね。
- reduceがない
- 単純にエンティティを削除するとかログ出力する等であれば問題ありませんが、例えばMapReduceで定番のワードカウント等を行うには自分で排他を考えて結果を集約する必要があります
- ジョブ管理ツールは便利
- これまでもcron用のジョブ管理ツールはほしいなあと思っていたので、このMapReduceに付属するツールはなかなかいい感じです。つまりは一晩中動かしておくcronと思って使えばいいのでしょうか。
- 管理用カインドをDatastoreに作成する
- このツールを動かすと、Datastore上には「MapreduceControl」、「MapreduceState」、「ShardState」というタスク制御用のカインドが自動的に作成されます。
- Blobstore対応は便利そうだ
- Blobstoreも最大2GBまでのファイルアップロードに対応しましたので、例えば巨大なログファイルをApp EngineにアップしてMapperで1行ごとに処理。。といった使い方が可能です。zipファイルを解凍して個々のファイルを処理する機能も用意されています。
遅い10万件のエンティティを全検索するジョブを2回投入しましたが、いずれも16分かかりました。自作のTask Queueデモでは1分もかかりません。。「速さを求めてはいない」とはいえ、ちょっとマイペース過ぎる感があります。Hadoopとは比較になりませんね。。ちなみに処理速度は自動調整されるのですが、Task Queue上の該当するキュー(default)の最大レートは自動的に5/sに設定されます。- ちょっとおまじないを追加したところ、10万件エンティティを1分30秒で処理できました。まあまあの速さです。
どいう感じです。ちょっと一般的なMapReduceのイメージ(大規模並列処理!数100ノード!)とは違う感じではありますが、例えばスキーママイグレーションにともなう全エンティティのデータ修正や、大規模ログファイルの一括処理といった身近な用途で活躍するツールだと思います。