こんにちは、mikotoです。

前回、CloudSearchのドキュメントアップロード時の注意 で書きましたが、CloudSearchのバッチはアップロードレート(同時アップロード数とアップロード頻度)を制限しないと 5XX が発生します。

そのため、以下のような課題を抱えることになります。

  1. アップロードレートを制限するような処理をSparkを載せた高価なインスタンスで行うには勿体無い
  2. もともと投入したバッチファイルはS3に保存する仕様のため、S3のObjectCreatedイベントをハンドリングすることで、別の場所にバッチ投入を切り出したい

そこで今回は、これらの課題を解消する仕組みをlambdaを使って構築しましたので、その仕組みをご紹介したいと思います。

課題解決のため、実現したいこと

  • CloudSearchへのバッチアップロードレートを制御したい
  • バッチファイルの作成と投入を行うシステムを、疎結合化したい
  • バッチファイルの投入が失敗した場合、リトライしたい
  • バッチファイルの投入を行うサーバーを持ちたくない

これらの要件を満たす仕組みを考えました。

システム概要

pasted_image_at_2016_11_07_06_36_pm

行っていることは下記になります。

  1. CloudSearchに投入するバッチファイル(JSON)をS3にアップロードする
  2. S3のObjectCreatedイベントをハンドリングし、SQSにメッセージを貯める
  3. 一定時間毎にLambdaをキックし、SQSからメッセージを複数取得する(アップレードレート調整)
  4. 取得したメッセージそれぞれを投入するLambdaを実行する
  5. バッチの投入に成功したメッセージをSQSから削除する

手順

1、SQSの作成

メッセージを保存するSQSを作成します。
デフォルトの可視性タイムアウト をバッチの処理時間を考えて長目に設定しておくと良いでしょう。

2、S3のイベント設定

イベントが発火した際に、作成したSQSキューにメッセージが投入されるように設定を行います。
(バッチ投入先のS3バケットはあらかじめ作成しておいてください。)

うえ

3、Lambdaの作成

後で紹介するコードを参考にLambda Functionを作成します。
Trigger条件に、スケジュールドイベントによって定期実行する設定を行います。
した

最後に、注意点

バッチアップロードレートの制御

CloudSearchのバッチ処理速度は、スケールオプションの設定と投入するバッチの内容によって変わるため、 一度に処理するバッチファイル数バッチの実行頻度 を調整する必要があります。
逆に言えば、投入するドキュメントが増えた場合には、スケールオプションを変更し、再度上記2つのパラメータを変更することでシステムをスケールさせることができます。

調整方法としては、先に一度に処理するバッチファイル数の限界を実際に実行しながら調整し、その後Lambdaの最大実行時間に対して余裕が出るようにバッチの実行頻度を設定すると良いでしょう。

参考)Lambdaのコード

'use strict';

const AWS = require('aws-sdk');

const SQS = new AWS.SQS({ apiVersion: '2012-11-05' });
const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' });
const S3 = new AWS.S3({ apiVersion: '2006-03-01' });

const CLOUDSEARCH_ENDPOINT = 'ここにCloudSearchのエンドポイント';
const REGION = "ap-northeast-1"

const QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/アカウントID/SQSキューの名前';
const PROCESS_MESSAGE = 'process-message';


function invokePoller(functionName, message) {
    console.log("invokePoller", message);
    const payload = {
        operation: PROCESS_MESSAGE,
        message: message
    };
    const params = {
        FunctionName: functionName,
        InvocationType: 'Event',
        Payload: new Buffer(JSON.stringify(payload)),
    };
    return new Promise((resolve, reject) => {
        Lambda.invoke(params, (err, data) => (err ? reject(err) : resolve(data)));
    });
}


function process(message, callback) {
    console.log(message);

    // define delete message params
    const deleteParams = {
        QueueUrl: QUEUE_URL,
        ReceiptHandle: message.ReceiptHandle,
    };

    // TODO process message
    const json = JSON.parse(message.Body);
    if (json.Records) {
        const bucket = json.Records[0].s3.bucket.name;
        const key = json.Records[0].s3.object.key

        if (key.endsWith("_SUCCESS")) {
            // JSONをバッチで投入して成功したときに_SUCCESSファイルが作成される仕様のため、ここではそれを無視する
            SQS.deleteMessage(deleteParams, (err) => callback(err, message));
        } else {
            const s3params = {
                Bucket: bucket,
                Key: key,
                ResponseContentType: "application/json"
            };
            S3.getObject(s3params, (err, data) => {
                if (err) {
                    console.log(err);
                    const message = `Error getting object ${key} from bucket ${bucket}. Make sure they exist and your bucket is in the same region as this function.`;
                    console.log(message);
                    callback("[Error]Loading from S3", message);
                } else {
                    var cloudsearchDomain = new AWS.CloudSearchDomain({
                        endpoint: CLOUDSEARCH_ENDPOINT,
                        region: REGION
                    });
                    var params = {
                        contentType: 'application/json',
                        documents: data.Body
                    };
                    cloudsearchDomain.uploadDocuments(params, function(err, data) {
                        if (err) {
                            console.log("[Error]Uploading to CloudSearch", err);
                            callback(err, message);
                        } else {
                            console.log("[Success]Uploading to CloudSearch", data);
                            SQS.deleteMessage(deleteParams, (err) => callback(err, data));
                        }
                    });
                }
            });
        }
    } else {
        SQS.deleteMessage(deleteParams, (err) => callback(err, message));
    }
}

function poll(functionName, callback) {
    const params = {
        QueueUrl: QUEUE_URL,
        MaxNumberOfMessages: 5,
        VisibilityTimeout: 10,
    };
    // batch request messages
    SQS.receiveMessage(params, (err, data) => {
        if (err) {
            return callback(err);
        }
        // for each message, reinvoke the function
        if (data.Messages) {
            const promises = data.Messages.map((message) => invokePoller(functionName, message));
            // complete when all invocations have been made
            Promise.all(promises).then((responses) => {
                console.log(responses);
                callback(null, responses);
            }).catch((e) => {
                console.log(e);
                callback(null, e);
            });
        } else {
            callback(null, "SQS is empty");
        }
    });
}

exports.handler = (event, context, callback) => {
    try {
        if (event.operation === PROCESS_MESSAGE) {
            // invoked by poller
            process(event.message, callback);
        } else {
            // invoked by schedule
            poll(context.functionName, callback);
        }
    } catch (err) {
        callback(err);
    }
};