/ #Lambda Powertools #sqs 

LambdaPowertoolsでSQSのバッチ処理を楽にやろう

もくじ

LambdaPowertoolsシリーズの第3弾SQSのバッチ処理で使ってみるです

さて、これなにが嬉しいのでしょうか?

SQSとLambdaの話

SQSはキューで、Lambdaと接続できます。

Lambdaと接続すると、SQSにメッセージが積まれるとそれをトリガーにLambda実行されます。

このとき注意点があります。

それはSQSがLambdaをキックして起動するのはなく、LambdaがSQSにポーリングして、メッセージがあれば実行するふるまいをしているという点です。

つまり非同期実行!


(2021/09/23) Twitterでご質問頂いたので補足します!

SQSからLambdaは処理的に非同期です。ただし、Lambdaの呼び出し方は同期呼び出しです(笑)

公式ドキュメントだとこの辺に書いてあります。

https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-lambda-function-trigger.html
Lambda はキューをポーリングし、キューメッセージを含むイベントで Lambda 関数を同期的に呼び出します。別のキューを指定して、デッドレターキューは、Lambda 関数が処理できないメッセージです。

そしてこのときの注意事項ですが、書いてありますが非同期呼び出しで使えるLambdaのデッドレターキューはSQSでは使えません。同期呼び出しだから!

なのでSQSとLambdaの接続は、処理的には非同期ですが、呼び出しは同期呼び出しです!

非同期呼び出し同期呼び出しの詳細は下記を参照くださいませ!

同期呼び出し

非同期呼び出し


そしてLambdaがSQSから取得するメッセージ数が1つとは限らないということです。

このLambdaがポーリングで取得するメッセージの最大数をバッチサイズと呼びます。

LambdaPowertoolsで嬉しいところ

SQSに接続したLambdaの実装で、、、

私の気持ちとしては、SQSの1つのメッセージに対してlambda関数を動かしたいです。

が実際の挙動は、Lambdaのeventには最大でバッチサイズ数分のメッセージが詰まったeventが来ます。

そのためLambda内ではまず、復数メッセージを処理するロジックを書いてから、1つづつのメッセージ向けの処理を書く必要がでてきます。

バッチサイズを1にするという方法もありますが、それでもListで受け取ったeventから1つを取り出す。的な無駄な処理をかく必要があります。

煩わしいですね、、、

この煩わしさを簡単に解消してくれるものがあります。

それが、LambdaPowertoolsです!!

嬉しいですねー

何がしたいの?

さて、上記の問題を解決するためのサンプルなコードをチュートリアル的にCDKでか作ります。

またその後動作確認をしてみます。。

今回つくるものはこんな感じ

では 作っていきましょう👻

プロジェクトを用意する

いつも通りなので、このへんはコマンドだけ

$ mkdir cdk_lambda_powertools_sqs_batch
$ cd cdk_lambda_powertools_sqs_batch
$ cdk init --language python
$ rm setup.py source.bat

# Lambdaのディレクトリも作っておく
$ mkdir lambda_app

必要なパッケージを書く

aws-cdk.core==1.121.0

aws-cdk.aws-lambda==1.121.0
aws_cdk.aws_lambda_python==1.121.0
aws_cdk.aws_lambda_event_sources==1.121.0
aws_cdk.aws_sqs==1.121.0

インストールする

$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install -r requirements.txt

Stackを用意する

LambdaとSQSを用意してい接続させます。

from aws_cdk import core
from aws_cdk.aws_lambda import Runtime
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from aws_cdk.aws_lambda_python import PythonFunction
from aws_cdk.aws_sqs import Queue

APP_NAME = "CdkLambdaPowertoolsSqsBatch"


class CdkLambdaPowertoolsSqsBatchStack(core.Stack):
    def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        queue = Queue(self, f"{APP_NAME}Queue", visibility_timeout=core.Duration.seconds(60),)

        lambda_ = PythonFunction(self, f"{APP_NAME}Lambda", entry="lambda_app", runtime=Runtime.PYTHON_3_8,)

        lambda_.add_event_source(SqsEventSource(queue))

Lambdaを実装する

Lambdaのパッケージ一覧を作成

aws-lambda-powertools

アプリ本体は下記のように実装してみます

import json

from aws_lambda_powertools.utilities.batch import sqs_batch_processor


def record_handler(record):
    print(f"メッセージ毎の処理するよ!{json.dumps(record)}")
    return


@sqs_batch_processor(record_handler=record_handler)
def handler(event, context):
    print(f"ハンドラーだよ!{json.dumps(event)}")
    return {"statusCode": 200}

AWS環境へデプロイ

下記のコマンドでデプロイします!

$ cdk deploy

CloudWatchログを見る準備

今回はログの挙動を追いかけたいので、ターミナルでログをtailしておきます。

$ FUNCTION_NAME=$(aws lambda list-functions --output text --query 'Functions[?contains(FunctionName,`cdk-lambda-powertools-sqs`)].FunctionName')

# ログを表示しっぱなしにする(今後ログはここを見て確認する)
$ aws logs tail --follow /aws/lambda/$FUNCTION_NAME

キューを積んでみる

別のターミナルを開いて下記をおくってみる

$ SQS_URL=$(aws sqs list-queues --query 'QueueUrls[?contains(@, `cdk-lambda-powertools-sqs-batch`)]' --output text)

# 投げてみる
$ aws sqs send-message --queue-url $SQS_URL --message-body {"hoge":1}
{
    "MD5OfMessageBody": "33ee9c49d619a5a12ada91a91d9a5e41",
    "MessageId": "4d337d6d-8efa-4e94-b023-38af02da14ff"
}

細かいところは削除しましたが、ログにこんな感じで表示されたはず!

START RequestId: 7888908e-3243-594c-852b-c9205a36e309 Version: $LATEST
メッセージ毎の処理するよ!{"messageId": "...", "receiptHandle": "...", "body": "{hoge:1}", "attributes": {...}, "messageAttributes": {}, "md5OfBody": "...", "eventSource": "aws:sqs", "eventSourceARN": "...", "awsRegion": "..."}
ハンドラーだよ!{"Records": [{"messageId": "...", "receiptHandle": "...", "body": "{hoge:1}", "attributes": {...}, "messageAttributes": {}, "md5OfBody": "...", "eventSource": "aws:sqs", "eventSourceARN": "...", "awsRegion": "..."}]}
END RequestId: 7888908e-3243-594c-852b-c9205a36e309
REPORT RequestId: 7888908e-3243-594c-852b-c9205a36e309	Duration: 704.67 ms	Billed Duration: 705 ms	Memory Size: 128 MB	Max Memory Used: 73 MB	Init Duration: 587.58 ms

関数record_handler動いた後に関数handlerが起動しました。

とりあえず動いていることが確認できました。

ただログが見にくいのでラムダをちょっと変更して、デプロイし直します。

...
def record_handler(record):
    print(f"メッセージ毎の処理するよ!{json.dumps(record['body'])}")
    return


@sqs_batch_processor(record_handler=record_handler)
def handler(event, context):
    print(f"ハンドラーだよ!{json.dumps([x['body'] for x in event['Records']])}")
    return {"statusCode": 200}
...

では送ってみましょう!5個くらい同時に!

まず下記のようなファイルを作ります

[
  {
    "Id": "1",
    "MessageBody": "hoge1"
  },
  {
    "Id": "2",
    "MessageBody": "hoge2"
  },
  {
    "Id": "3",
    "MessageBody": "hoge3"
  },
  {
    "Id": "4",
    "MessageBody": "hoge4"
  },
  {
    "Id": "5",
    "MessageBody": "hoge5"
  }
]

そして下記のコマンドで送信します!

$ aws sqs send-message-batch --queue-url $SQS_URL --entries file://send-message-batch.json
{
    "Successful": [
        {
            "Id": "1",
            "MessageId": "7f897b5a-ce12-4cf2-9885-1c129165d566",
            "MD5OfMessageBody": "12e25328fc3add1667c385b7bb138c23"
        },
        {
            "Id": "2",
            "MessageId": "eeeaeaaf-f58e-427a-aab9-063a4cb6d324",
            "MD5OfMessageBody": "8ad097560c67f274d4ed263a2bcd8f71"
        },
        {
            "Id": "3",
            "MessageId": "ad3c6b0c-6c7a-4a65-87f8-381423e59845",
            "MD5OfMessageBody": "fa50e06a5f8fbf47d532f888dc7f265b"
        },
        {
            "Id": "4",
            "MessageId": "43a61a95-b904-43c4-9e69-1ec5257eb00f",
            "MD5OfMessageBody": "4eaa8dace968a0a306fb7e6c9cba4abc"
        },
        {
            "Id": "5",
            "MessageId": "d3ca954e-4d93-41cc-badc-e588d4aa0d2c",
            "MD5OfMessageBody": "08ff0679227447ba1e3d465795d2838c"
        }
    ]
}

そしてログをみてみる、、、

START RequestId: 33309c99-a158-5bc3-9f0c-c537d10196c1 Version: $LATEST
START RequestId: b0843f10-f766-59a7-8ae4-66be87072a16 Version: $LATEST
START RequestId: 265d805f-1b5b-57dd-a674-d9f16b3ab93a Version: $LATEST
メッセージ毎の処理するよ!"hoge2"
メッセージ毎の処理するよ!"hoge3"
ハンドラーだよ!["hoge2", "hoge3"]
メッセージ毎の処理するよ!"hoge5"
ハンドラーだよ!["hoge5"]
メッセージ毎の処理するよ!"hoge1"
メッセージ毎の処理するよ!"hoge4"
ハンドラーだよ!["hoge1", "hoge4"]
END RequestId: b0843f10-f766-59a7-8ae4-66be87072a16
REPORT RequestId: b0843f10-f766-59a7-8ae4-66be87072a16	Duration: 32.84 ms	Billed Duration: 33 ms	Memory Size: 128 MMax Memory Used: 74 MB
END RequestId: 265d805f-1b5b-57dd-a674-d9f16b3ab93a
REPORT RequestId: 265d805f-1b5b-57dd-a674-d9f16b3ab93a	Duration: 30.01 ms	Billed Duration: 31 ms	Memory Size: 128 MMax Memory Used: 74 MB
END RequestId: 33309c99-a158-5bc3-9f0c-c537d10196c1
REPORT RequestId: 33309c99-a158-5bc3-9f0c-c537d10196c1	Duration: 42.47 ms	Billed Duration: 43 ms	Memory Size: 128 MMax Memory Used: 74 MB

ログを見た感じですと、今回はラムダが3個同時に動いたようです。

初めのハンドラーだよ!["hoge2", "hoge3"]から挙動を見てみます。

この場合、受信したイベントには"hoge2", "hoge3"の2つのメッセージが含まれていました。

ラムダ関数の中では、"hoge2""hoge3"を処理した後に最後にhandler内のロジックが動いていることが確認できます。

つまり取得したeventをhandler内でゴニョゴニョ処理した後にメッセージ1つづつ処理する!的な処理はできない!ってことですね。

(やる方法もあります→このへん

例外を出してみる

最後に例外処理をやってみます。

まずLambdaPowertoolsを使わないで、例外処理をさせた場合に軽く触れます。

何も対策をしないでLambda内で例外を出すと、batchのeventすべてが失敗したことになります!

それを防止するためには、成功したらキューを明示的に削除する必要があります。

具体的なコードはこんな感じですかね

import boto3

def handler(event, context):
    sqs = boto3.resource("sqs")
    queue = sqs.get_queue_by_name(QueueName="キュー名")
    for record in event['Records']:

        # ...何かしらの処理

        queue.delete_messages(ReceiptHandle=record['receiptHandle'])  # 明示的に削除

煩わしいですねー、、、

そしてLambdaPowertoolsはどうかを見てみます。

Lambda関数内をちょっと修正します

...
def record_handler(record):
    print(f"メッセージ毎の処理するよ!{json.dumps(record['body'])}")
    if int(record["body"].replace("hoge", "")) % 2:
        # 奇数なら例外を出す
        print(f"例外だ!!{json.dumps(record['body'])}")
        raise
    return
...

何をしているかですが、hogeXのXの値が奇数だったら例外を出すという感じです!

さらに、さきほどはLambdaが3個同時に動きメッセージがバラけてしまったので、Lambdaの同時実行数を1個にしてかつ10秒間のロングポーリングにしてみます。

...
class CdkLambdaPowertoolsSqsBatchStack(core.Stack):
    def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
		...
        lambda_ = PythonFunction(
            self, f"{APP_NAME}Lambda", entry="lambda_app", runtime=Runtime.PYTHON_3_8, reserved_concurrent_executions=1
        )
		...
        lambda_.add_event_source(SqsEventSource(queue, max_batching_window=core.Duration.seconds(10)))
		...

デプロイします。

これ注意点ですが、Lambdaが失敗して可視性タイムアウトを過ぎると再びメッセージが復活してLambdaが起動する、、、

をメッセージ保持期間中繰り返すので動作確認したら手動でクリアするのを忘れないでください!!

今回の動作確認だと、偶数は正常処理されるけれども、奇数のメッセージは繰り返される感じです

ではメッセージ積んでみます。

$ aws sqs send-message-batch --queue-url $SQS_URL --entries file://send-message-batch.json

可視性タイムアウトは1分に設定しているので、コマンド叩いてから5分くらいコーヒーブレイクしましょう。

ある程度ログが溜まったところでクリア

CLIから簡単にやる方法がわからないのでSQSのコンソールから、キュー名をクリックしてクリアボタンおしてください。

具体例)東京リージョンの場合は下記にアクセスして、、

https://ap-northeast-1.console.aws.amazon.com/sqs

対象のSQS名をクリックして、下記のクリアをクリックすればOK

ログはこんな感じ。

(見にくいので、例外で発生するTracebackのログは削除しました)


# 1回目

START RequestId: 013a4ed3-9827-5eca-8b82-177d472c3198 Version: $LATEST
メッセージ毎の処理するよ!"hoge4"
ハンドラーだよ!["hoge4"]
END RequestId: 013a4ed3-9827-5eca-8b82-177d472c3198
REPORT RequestId: 013a4ed3-9827-5eca-8b82-177d472c3198	Duration: 636.97 ms	Billed Duration: 637 ms	Memory Size: 128 MMax Memory Used: 74 MB	Init Duration: 477.22 ms

START RequestId: 97ab1936-48cf-5da4-b5aa-409897c4cb32 Version: $LATEST
メッセージ毎の処理するよ!"hoge3"
例外だ!!"hoge3"
メッセージ毎の処理するよ!"hoge1"
例外だ!!"hoge1"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 2 individual errors logged separately below.
END RequestId: 97ab1936-48cf-5da4-b5aa-409897c4cb32
REPORT RequestId: 97ab1936-48cf-5da4-b5aa-409897c4cb32	Duration: 28.99 ms	Billed Duration: 29 ms	Memory Size: 128 MMax Memory Used: 74 MB

START RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f Version: $LATEST
メッセージ毎の処理するよ!"hoge2"
メッセージ毎の処理するよ!"hoge5"
例外だ!!"hoge5"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
END RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f
REPORT RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f	Duration: 270.02 ms	Billed Duration: 271 ms	Memory Size: 128 MMax Memory Used: 76 MB

# 2回目

START RequestId: 5cd1aa40-b493-57b3-9c5d-f3ee6bd4507b Version: $LATEST
メッセージ毎の処理するよ!"hoge5"
例外だ!!"hoge5"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
END RequestId: 5cd1aa40-b493-57b3-9c5d-f3ee6bd4507b
REPORT RequestId: 5cd1aa40-b493-57b3-9c5d-f3ee6bd4507b	Duration: 36.63 ms	Billed Duration: 37 ms	Memory Size: 128 MMax Memory Used: 76 MB

START RequestId: a59c9929-8c6c-5aef-b7f9-c08753249eb4 Version: $LATEST
メッセージ毎の処理するよ!"hoge1"
例外だ!!"hoge1"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
ND RequestId: a59c9929-8c6c-5aef-b7f9-c08753249eb4
REPORT RequestId: a59c9929-8c6c-5aef-b7f9-c08753249eb4	Duration: 25.68 ms	Billed Duration: 26 ms	Memory Size: 128 MMax Memory Used: 76 MB

START RequestId: dc928b7f-b4dd-51b2-831d-d2d16be74724 Version: $LATEST
メッセージ毎の処理するよ!"hoge3"
例外だ!!"hoge3"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
END RequestId: dc928b7f-b4dd-51b2-831d-d2d16be74724
REPORT RequestId: dc928b7f-b4dd-51b2-831d-d2d16be74724	Duration: 18.38 ms	Billed Duration: 19 ms	Memory Size: 128 MMax Memory Used: 76 MB

# 3回目

START RequestId: 817d2bfc-d235-5888-b622-72520d4127da Version: $LATEST
メッセージ毎の処理するよ!"hoge3"
例外だ!!"hoge3"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
ND RequestId: 817d2bfc-d235-5888-b622-72520d4127da
REPORT RequestId: 817d2bfc-d235-5888-b622-72520d4127da	Duration: 33.46 ms	Billed Duration: 34 ms	Memory Size: 128 MMax Memory Used: 77 MB

START RequestId: 567baa35-9d7a-5a87-ac73-f39b42096d4f Version: $LATEST
メッセージ毎の処理するよ!"hoge5"
例外だ!!"hoge5"
メッセージ毎の処理するよ!"hoge1"
例外だ!!"hoge1"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 2 individual errors logged separately below.
END RequestId: 567baa35-9d7a-5a87-ac73-f39b42096d4f
REPORT RequestId: 567baa35-9d7a-5a87-ac73-f39b42096d4f	Duration: 73.74 ms	Billed Duration: 74 ms	Memory Size: 128 MMax Memory Used: 77 MB

ログを見ると5分コーヒーブレイクとかいっておきながら、2分しかログ集めてないですね。

あまりいいログがでてにないなーと思いつつ、1回目の下記のログがましですかね、、、

START RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f Version: $LATEST
メッセージ毎の処理するよ!"hoge2"
メッセージ毎の処理するよ!"hoge5"
例外だ!!"hoge5"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
END RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f
REPORT RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f	Duration: 270.02 ms	Billed Duration: 271 ms	Memory Size: 128 MMax Memory Used: 76 MB

ログの流れ的に(Lambdaが1個なので上から順に時系列)これはhoge2hoge5の2つのメッセージをeventで受け取ったはずです。

が、2回目ではhoge2は繰り返されておらず、hoge5は繰り返されています。

このふるまいから、関数record_handlr内で例外が発生した場合も、正常に処理されているメッセージはキューから削除されていることが確認できました!

そしてもちろん例外が発生してメッセージは再度処理される。

まとめ

LambdaPowertoolsを使うと何が嬉しいか?といいうことを確認した後に、CDKを使って環境を用意してふるまいを観察しました。

またLambdaPowertoolsでは特別な処理を書く必要なく、例外が発生した場合もいい感じに処理をしてくれることを確認しました。

今回のリポジトリはこちら

https://github.com/sisi100/cdk_lambda_powertools_sqs_batch

Author

Sisii

AWSが好きなフリーランスのエンジニア。主にAWSを使ったバックエンドの開発をしています😊最近Frontも始めました