LambdaPowertoolsでSQSのバッチ処理を楽にやろう
LambdaPowertoolsシリーズの第3弾SQSのバッチ処理で使ってみる
です
さて、これなにが嬉しいのでしょうか?
SQSとLambdaの話
SQSはキューで、Lambdaと接続できます。
Lambdaと接続すると、SQSにメッセージが積まれるとそれをトリガーにLambda実行されます。
このとき注意点があります。
それはSQSがLambdaをキックして起動するのはなく、LambdaがSQSにポーリングして、メッセージがあれば実行するふるまいをしているという点です。
つまり非同期実行!
(2021/09/23) Twitterでご質問頂いたので補足します!
SQSからLambdaは処理的に非同期です。ただし、Lambdaの呼び出し方は同期呼び出し
です(笑)
公式ドキュメントだとこの辺に書いてあります。
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-lambda-function-trigger.html
Lambda はキューをポーリングし、キューメッセージを含むイベントで Lambda 関数を同期的に呼び出します。別のキューを指定して、デッドレターキューは、Lambda 関数が処理できないメッセージです。
そしてこのときの注意事項ですが、書いてありますが非同期呼び出しで使えるLambdaのデッドレターキューはSQSでは使えません。同期呼び出しだから!
なのでSQSとLambdaの接続は、処理的には非同期ですが、呼び出しは同期呼び出し
です!
非同期呼び出し
と同期呼び出し
の詳細は下記を参照くださいませ!
→ 同期呼び出し
→ 非同期呼び出し
そしてLambdaがSQSから取得するメッセージ数が1つとは限らないということです。
このLambdaがポーリングで取得するメッセージの最大数をバッチサイズと呼びます。
LambdaPowertoolsで嬉しいところ
SQSに接続したLambdaの実装で、、、
私の気持ちとしては、SQSの1つのメッセージに対してlambda関数を動かしたいです。
が実際の挙動は、Lambdaのeventには最大でバッチサイズ数分のメッセージが詰まったeventが来ます。
そのためLambda内ではまず、復数メッセージを処理するロジックを書いてから、1つづつのメッセージ向けの処理を書く必要がでてきます。
バッチサイズを1にするという方法もありますが、それでもListで受け取ったeventから1つを取り出す。的な無駄な処理をかく必要があります。
煩わしいですね、、、
この煩わしさを簡単に解消してくれるものがあります。
それが、LambdaPowertoolsです!!
嬉しいですねー
何がしたいの?
さて、上記の問題を解決するためのサンプルなコードをチュートリアル的にCDKでか作ります。
またその後動作確認をしてみます。。
今回つくるものはこんな感じ
では 作っていきましょう👻
プロジェクトを用意する
いつも通りなので、このへんはコマンドだけ
$ mkdir cdk_lambda_powertools_sqs_batch
$ cd cdk_lambda_powertools_sqs_batch
$ cdk init --language python
$ rm setup.py source.bat
# Lambdaのディレクトリも作っておく
$ mkdir lambda_app
必要なパッケージを書く
aws-cdk.core==1.121.0
aws-cdk.aws-lambda==1.121.0
aws_cdk.aws_lambda_python==1.121.0
aws_cdk.aws_lambda_event_sources==1.121.0
aws_cdk.aws_sqs==1.121.0
インストールする
$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install -r requirements.txt
Stackを用意する
LambdaとSQSを用意してい接続させます。
from aws_cdk import core
from aws_cdk.aws_lambda import Runtime
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from aws_cdk.aws_lambda_python import PythonFunction
from aws_cdk.aws_sqs import Queue
APP_NAME = "CdkLambdaPowertoolsSqsBatch"
class CdkLambdaPowertoolsSqsBatchStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
queue = Queue(self, f"{APP_NAME}Queue", visibility_timeout=core.Duration.seconds(60),)
lambda_ = PythonFunction(self, f"{APP_NAME}Lambda", entry="lambda_app", runtime=Runtime.PYTHON_3_8,)
lambda_.add_event_source(SqsEventSource(queue))
Lambdaを実装する
Lambdaのパッケージ一覧を作成
aws-lambda-powertools
アプリ本体は下記のように実装してみます
import json
from aws_lambda_powertools.utilities.batch import sqs_batch_processor
def record_handler(record):
print(f"メッセージ毎の処理するよ!{json.dumps(record)}")
return
@sqs_batch_processor(record_handler=record_handler)
def handler(event, context):
print(f"ハンドラーだよ!{json.dumps(event)}")
return {"statusCode": 200}
AWS環境へデプロイ
下記のコマンドでデプロイします!
$ cdk deploy
CloudWatchログを見る準備
今回はログの挙動を追いかけたいので、ターミナルでログをtailしておきます。
$ FUNCTION_NAME=$(aws lambda list-functions --output text --query 'Functions[?contains(FunctionName,`cdk-lambda-powertools-sqs`)].FunctionName')
# ログを表示しっぱなしにする(今後ログはここを見て確認する)
$ aws logs tail --follow /aws/lambda/$FUNCTION_NAME
キューを積んでみる
別のターミナルを開いて下記をおくってみる
$ SQS_URL=$(aws sqs list-queues --query 'QueueUrls[?contains(@, `cdk-lambda-powertools-sqs-batch`)]' --output text)
# 投げてみる
$ aws sqs send-message --queue-url $SQS_URL --message-body {"hoge":1}
{
"MD5OfMessageBody": "33ee9c49d619a5a12ada91a91d9a5e41",
"MessageId": "4d337d6d-8efa-4e94-b023-38af02da14ff"
}
細かいところは削除しましたが、ログにこんな感じで表示されたはず!
START RequestId: 7888908e-3243-594c-852b-c9205a36e309 Version: $LATEST
メッセージ毎の処理するよ!{"messageId": "...", "receiptHandle": "...", "body": "{hoge:1}", "attributes": {...}, "messageAttributes": {}, "md5OfBody": "...", "eventSource": "aws:sqs", "eventSourceARN": "...", "awsRegion": "..."}
ハンドラーだよ!{"Records": [{"messageId": "...", "receiptHandle": "...", "body": "{hoge:1}", "attributes": {...}, "messageAttributes": {}, "md5OfBody": "...", "eventSource": "aws:sqs", "eventSourceARN": "...", "awsRegion": "..."}]}
END RequestId: 7888908e-3243-594c-852b-c9205a36e309
REPORT RequestId: 7888908e-3243-594c-852b-c9205a36e309 Duration: 704.67 ms Billed Duration: 705 ms Memory Size: 128 MB Max Memory Used: 73 MB Init Duration: 587.58 ms
関数record_handler
動いた後に関数handler
が起動しました。
とりあえず動いていることが確認できました。
ただログが見にくいのでラムダをちょっと変更して、デプロイし直します。
...
def record_handler(record):
print(f"メッセージ毎の処理するよ!{json.dumps(record['body'])}")
return
@sqs_batch_processor(record_handler=record_handler)
def handler(event, context):
print(f"ハンドラーだよ!{json.dumps([x['body'] for x in event['Records']])}")
return {"statusCode": 200}
...
では送ってみましょう!5個くらい同時に!
まず下記のようなファイルを作ります
そして下記のコマンドで送信します!
$ aws sqs send-message-batch --queue-url $SQS_URL --entries file://send-message-batch.json
{
"Successful": [
{
"Id": "1",
"MessageId": "7f897b5a-ce12-4cf2-9885-1c129165d566",
"MD5OfMessageBody": "12e25328fc3add1667c385b7bb138c23"
},
{
"Id": "2",
"MessageId": "eeeaeaaf-f58e-427a-aab9-063a4cb6d324",
"MD5OfMessageBody": "8ad097560c67f274d4ed263a2bcd8f71"
},
{
"Id": "3",
"MessageId": "ad3c6b0c-6c7a-4a65-87f8-381423e59845",
"MD5OfMessageBody": "fa50e06a5f8fbf47d532f888dc7f265b"
},
{
"Id": "4",
"MessageId": "43a61a95-b904-43c4-9e69-1ec5257eb00f",
"MD5OfMessageBody": "4eaa8dace968a0a306fb7e6c9cba4abc"
},
{
"Id": "5",
"MessageId": "d3ca954e-4d93-41cc-badc-e588d4aa0d2c",
"MD5OfMessageBody": "08ff0679227447ba1e3d465795d2838c"
}
]
}
そしてログをみてみる、、、
START RequestId: 33309c99-a158-5bc3-9f0c-c537d10196c1 Version: $LATEST
START RequestId: b0843f10-f766-59a7-8ae4-66be87072a16 Version: $LATEST
START RequestId: 265d805f-1b5b-57dd-a674-d9f16b3ab93a Version: $LATEST
メッセージ毎の処理するよ!"hoge2"
メッセージ毎の処理するよ!"hoge3"
ハンドラーだよ!["hoge2", "hoge3"]
メッセージ毎の処理するよ!"hoge5"
ハンドラーだよ!["hoge5"]
メッセージ毎の処理するよ!"hoge1"
メッセージ毎の処理するよ!"hoge4"
ハンドラーだよ!["hoge1", "hoge4"]
END RequestId: b0843f10-f766-59a7-8ae4-66be87072a16
REPORT RequestId: b0843f10-f766-59a7-8ae4-66be87072a16 Duration: 32.84 ms Billed Duration: 33 ms Memory Size: 128 MMax Memory Used: 74 MB
END RequestId: 265d805f-1b5b-57dd-a674-d9f16b3ab93a
REPORT RequestId: 265d805f-1b5b-57dd-a674-d9f16b3ab93a Duration: 30.01 ms Billed Duration: 31 ms Memory Size: 128 MMax Memory Used: 74 MB
END RequestId: 33309c99-a158-5bc3-9f0c-c537d10196c1
REPORT RequestId: 33309c99-a158-5bc3-9f0c-c537d10196c1 Duration: 42.47 ms Billed Duration: 43 ms Memory Size: 128 MMax Memory Used: 74 MB
ログを見た感じですと、今回はラムダが3個同時に動いたようです。
初めのハンドラーだよ!["hoge2", "hoge3"]
から挙動を見てみます。
この場合、受信したイベントには"hoge2", "hoge3"
の2つのメッセージが含まれていました。
ラムダ関数の中では、"hoge2"
と"hoge3"
を処理した後に最後にhandler
内のロジックが動いていることが確認できます。
つまり取得したeventをhandler
内でゴニョゴニョ処理した後にメッセージ1つづつ処理する!的な処理はできない!ってことですね。
(やる方法もあります→このへん)
例外を出してみる
最後に例外処理をやってみます。
まずLambdaPowertools
を使わないで、例外処理をさせた場合に軽く触れます。
何も対策をしないでLambda内で例外を出すと、batchのeventすべてが失敗したことになります!
それを防止するためには、成功したらキューを明示的に削除する必要があります。
具体的なコードはこんな感じですかね
import boto3
def handler(event, context):
sqs = boto3.resource("sqs")
queue = sqs.get_queue_by_name(QueueName="キュー名")
for record in event['Records']:
# ...何かしらの処理
queue.delete_messages(ReceiptHandle=record['receiptHandle']) # 明示的に削除
煩わしいですねー、、、
そしてLambdaPowertools
はどうかを見てみます。
Lambda関数内をちょっと修正します
...
def record_handler(record):
print(f"メッセージ毎の処理するよ!{json.dumps(record['body'])}")
if int(record["body"].replace("hoge", "")) % 2:
# 奇数なら例外を出す
print(f"例外だ!!{json.dumps(record['body'])}")
raise
return
...
何をしているかですが、hogeX
のXの値が奇数だったら例外を出すという感じです!
さらに、さきほどはLambdaが3個同時に動きメッセージがバラけてしまったので、Lambdaの同時実行数を1個にしてかつ10秒間のロングポーリングにしてみます。
...
class CdkLambdaPowertoolsSqsBatchStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
...
lambda_ = PythonFunction(
self, f"{APP_NAME}Lambda", entry="lambda_app", runtime=Runtime.PYTHON_3_8, reserved_concurrent_executions=1
)
...
lambda_.add_event_source(SqsEventSource(queue, max_batching_window=core.Duration.seconds(10)))
...
デプロイします。
これ注意点ですが、Lambdaが失敗して可視性タイムアウトを過ぎると再びメッセージが復活してLambdaが起動する、、、
をメッセージ保持期間中繰り返すので動作確認したら手動でクリアするのを忘れないでください!!
今回の動作確認だと、偶数は正常処理されるけれども、奇数のメッセージは繰り返される感じです
ではメッセージ積んでみます。
$ aws sqs send-message-batch --queue-url $SQS_URL --entries file://send-message-batch.json
可視性タイムアウトは1分に設定しているので、コマンド叩いてから5分くらいコーヒーブレイクしましょう。
ある程度ログが溜まったところでクリア
CLIから簡単にやる方法がわからないのでSQSのコンソールから、キュー名をクリックしてクリアボタンおしてください。
具体例)東京リージョンの場合は下記にアクセスして、、
https://ap-northeast-1.console.aws.amazon.com/sqs
対象のSQS名をクリックして、下記のクリアをクリックすればOK
ログはこんな感じ。
(見にくいので、例外で発生するTracebackのログは削除しました)
# 1回目
START RequestId: 013a4ed3-9827-5eca-8b82-177d472c3198 Version: $LATEST
メッセージ毎の処理するよ!"hoge4"
ハンドラーだよ!["hoge4"]
END RequestId: 013a4ed3-9827-5eca-8b82-177d472c3198
REPORT RequestId: 013a4ed3-9827-5eca-8b82-177d472c3198 Duration: 636.97 ms Billed Duration: 637 ms Memory Size: 128 MMax Memory Used: 74 MB Init Duration: 477.22 ms
START RequestId: 97ab1936-48cf-5da4-b5aa-409897c4cb32 Version: $LATEST
メッセージ毎の処理するよ!"hoge3"
例外だ!!"hoge3"
メッセージ毎の処理するよ!"hoge1"
例外だ!!"hoge1"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 2 individual errors logged separately below.
END RequestId: 97ab1936-48cf-5da4-b5aa-409897c4cb32
REPORT RequestId: 97ab1936-48cf-5da4-b5aa-409897c4cb32 Duration: 28.99 ms Billed Duration: 29 ms Memory Size: 128 MMax Memory Used: 74 MB
START RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f Version: $LATEST
メッセージ毎の処理するよ!"hoge2"
メッセージ毎の処理するよ!"hoge5"
例外だ!!"hoge5"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
END RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f
REPORT RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f Duration: 270.02 ms Billed Duration: 271 ms Memory Size: 128 MMax Memory Used: 76 MB
# 2回目
START RequestId: 5cd1aa40-b493-57b3-9c5d-f3ee6bd4507b Version: $LATEST
メッセージ毎の処理するよ!"hoge5"
例外だ!!"hoge5"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
END RequestId: 5cd1aa40-b493-57b3-9c5d-f3ee6bd4507b
REPORT RequestId: 5cd1aa40-b493-57b3-9c5d-f3ee6bd4507b Duration: 36.63 ms Billed Duration: 37 ms Memory Size: 128 MMax Memory Used: 76 MB
START RequestId: a59c9929-8c6c-5aef-b7f9-c08753249eb4 Version: $LATEST
メッセージ毎の処理するよ!"hoge1"
例外だ!!"hoge1"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
ND RequestId: a59c9929-8c6c-5aef-b7f9-c08753249eb4
REPORT RequestId: a59c9929-8c6c-5aef-b7f9-c08753249eb4 Duration: 25.68 ms Billed Duration: 26 ms Memory Size: 128 MMax Memory Used: 76 MB
START RequestId: dc928b7f-b4dd-51b2-831d-d2d16be74724 Version: $LATEST
メッセージ毎の処理するよ!"hoge3"
例外だ!!"hoge3"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
END RequestId: dc928b7f-b4dd-51b2-831d-d2d16be74724
REPORT RequestId: dc928b7f-b4dd-51b2-831d-d2d16be74724 Duration: 18.38 ms Billed Duration: 19 ms Memory Size: 128 MMax Memory Used: 76 MB
# 3回目
START RequestId: 817d2bfc-d235-5888-b622-72520d4127da Version: $LATEST
メッセージ毎の処理するよ!"hoge3"
例外だ!!"hoge3"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
ND RequestId: 817d2bfc-d235-5888-b622-72520d4127da
REPORT RequestId: 817d2bfc-d235-5888-b622-72520d4127da Duration: 33.46 ms Billed Duration: 34 ms Memory Size: 128 MMax Memory Used: 77 MB
START RequestId: 567baa35-9d7a-5a87-ac73-f39b42096d4f Version: $LATEST
メッセージ毎の処理するよ!"hoge5"
例外だ!!"hoge5"
メッセージ毎の処理するよ!"hoge1"
例外だ!!"hoge1"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 2 individual errors logged separately below.
END RequestId: 567baa35-9d7a-5a87-ac73-f39b42096d4f
REPORT RequestId: 567baa35-9d7a-5a87-ac73-f39b42096d4f Duration: 73.74 ms Billed Duration: 74 ms Memory Size: 128 MMax Memory Used: 77 MB
ログを見ると5分コーヒーブレイクとかいっておきながら、2分しかログ集めてないですね。
あまりいいログがでてにないなーと思いつつ、1回目の下記のログがましですかね、、、
START RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f Version: $LATEST
メッセージ毎の処理するよ!"hoge2"
メッセージ毎の処理するよ!"hoge5"
例外だ!!"hoge5"
[ERROR] SQSBatchProcessingError: Not all records processed successfully. 1 individual errors logged separately below.
END RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f
REPORT RequestId: 24d0c076-c2e8-59d0-a05b-99d81de3b66f Duration: 270.02 ms Billed Duration: 271 ms Memory Size: 128 MMax Memory Used: 76 MB
ログの流れ的に(Lambdaが1個なので上から順に時系列)これはhoge2
とhoge5
の2つのメッセージをeventで受け取ったはずです。
が、2回目ではhoge2
は繰り返されておらず、hoge5
は繰り返されています。
このふるまいから、関数record_handlr
内で例外が発生した場合も、正常に処理されているメッセージはキューから削除されていることが確認できました!
そしてもちろん例外が発生してメッセージは再度処理される。
まとめ
LambdaPowertools
を使うと何が嬉しいか?といいうことを確認した後に、CDKを使って環境を用意してふるまいを観察しました。
またLambdaPowertools
では特別な処理を書く必要なく、例外が発生した場合もいい感じに処理をしてくれることを確認しました。