SQSと連携したLambdaの多重起動対策をLambdaPowertoolsでやる
SQSはLambdaと非同期に接続します。
このときの注意点ですがLambdaが同じメッセージに対して重複して処理する可能性があるというとこです。
今回はこれをLambdaPowertoolsで防止しようぜ!って記事です。
ちなみに前回、前々回の
LambdaPowertoolsでSQSのバッチ処理を楽にやろうとLambdaの冪等性をLambdaPowertoolsで簡単に実装するの記事は全てこの記事のための布石です!!
では 作っていきましょう👻
プロジェクトを用意する
いつもの!
$ mkdir cdk_sqs_idempotency_lambda
$ cd cdk_sqs_idempotency_lambda
$ cdk init --language python
$ rm setup.py source.bat
# Lambdaのディレクトリも作っておく
$ mkdir lambda_app
依存パッケージ
aws-cdk.core==1.121.0
aws-cdk.aws-lambda==1.121.0
aws_cdk.aws_lambda_python==1.121.0
aws_cdk.aws_lambda_event_sources==1.121.0
aws_cdk.aws_sqs==1.121.0
aws_cdk.aws_dynamodb==1.121.0
インストール
$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install -r requirements.txt
Stackを用意する
DynamoDBにテーブルを作ってLambdaに読み書きできるよう権限を付与して、SQSのキューを作ってLambdaのトリガーに設定する感じです!
from aws_cdk import core
from aws_cdk.aws_dynamodb import Attribute, AttributeType, BillingMode, Table
from aws_cdk.aws_lambda import Runtime
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from aws_cdk.aws_lambda_python import PythonFunction
from aws_cdk.aws_sqs import Queue
APP_NAME = "CdkSqsIdempotencyLambda"
class CdkSqsIdempotencyLambdaStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
table = Table(
self,
f"{APP_NAME}IdempotencyStore",
partition_key=Attribute(name="id", type=AttributeType.STRING),
time_to_live_attribute="expiration",
removal_policy=core.RemovalPolicy.DESTROY,
)
queue = Queue(self, f"{APP_NAME}Queue")
lambda_ = PythonFunction(
self,
f"{APP_NAME}Lambda",
entry="lambda_app",
runtime=Runtime.PYTHON_3_8,
environment={"IDEMPOTENCY_STORE_TABLE_NAME": table.table_name},
)
lambda_.add_event_source(SqsEventSource(queue))
table.grant_read_write_data(lambda_)
アプリを書く
Lambdaのパッケージ一覧を作成
aws-lambda-powertools
アプリ本体は下記のように実装してみます
import json
import os
from aws_lambda_powertools.utilities.batch import sqs_batch_processor
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer,
IdempotencyConfig,
idempotent_function,
)
persistence_layer = DynamoDBPersistenceLayer(table_name=os.getenv("IDEMPOTENCY_STORE_TABLE_NAME"))
config = IdempotencyConfig(event_key_jmespath="body")
@idempotent_function(data_keyword_argument="record", config=config, persistence_store=persistence_layer)
def record_handler(record):
print(f"メッセージ毎の処理するよ!{json.dumps(record['body'])}")
return record
@sqs_batch_processor(record_handler=record_handler)
def handler(event, context):
print(f"ハンドラーだよ!{json.dumps([x['body'] for x in event['Records']])}")
return {"statusCode": 200}
ほぼほぼ前回と前々回のコードをくっつけただけですね笑
違いは@idempotent_function
この部分で、前回は@idempotent
でした。
使い分けは@idempotent
はLambdaハンドラーにしか使えず、それ以外なら@idempotent_function
という感じです。
また@idempotent_function
を使う場合はdata_keyword_argument
で引数を教えてあげる必要があります。
また今回はrecord
にあるbody
に対して冪等な処理を行いたいので、IdempotencyConfig(event_key_jmespath="body")
としてます。
デプロイ
下記のコマンドでデプロイ!
cdk deploy
動作確認
CloudWatchのログをtailしておく。
$ FUNCTION_NAME=$(aws lambda list-functions --output text --query 'Functions[?contains(FunctionName,`CdkSqsIdempotencyLambda`)].FunctionName')
$ aws logs tail --follow /aws/lambda/$FUNCTION_NAME
Lambdaが一回も実行されていないと、ロググループが作らえないのでコマンドを叩くのはLambdaを実行した後です。
メッセージを送ってみる
前回同様、下記のようなファイルをまずつくります
次に下記コマンドを叩いてメッセージを5こ積みます。
$ SQS_URL=$(aws sqs list-queues --query 'QueueUrls[?contains(@, `CdkSqsIdempotencyLambdaStack`)]' --output text)
$ aws sqs send-message-batch --queue-url $SQS_URL --entries file://send-message-batch.json
ここでログのコマンドを叩いて動作を確認します。
# 1回目
START RequestId: 96b63c3d-78ee-5301-99cd-65921e09de38 Version: $LATEST
START RequestId: 3679d4fd-f381-5dad-9b4b-8d5295733918 Version: $LATEST
メッセージ毎の処理するよ!"hoge5"
メッセージ毎の処理するよ!"hoge1"
メッセージ毎の処理するよ!"hoge4"
メッセージ毎の処理するよ!"hoge3"
ハンドラーだよ!["hoge5", "hoge1", "hoge3"]
メッセージ毎の処理するよ!"hoge2"
END RequestId: 96b63c3d-78ee-5301-99cd-65921e09de38
REPORT RequestId: 96b63c3d-78ee-5301-99cd-65921e09de38 Duration: 906.68 ms Billed Duration: 907 ms Memory Size: 128 MB Max Memory Used: 81 MB Init Duration: 635.18 ms
ハンドラーだよ!["hoge4", "hoge2"]
END RequestId: 3679d4fd-f381-5dad-9b4b-8d5295733918
REPORT RequestId: 3679d4fd-f381-5dad-9b4b-8d5295733918 Duration: 851.40 ms Billed Duration: 852 ms Memory Size: 128 MB Max Memory Used: 81 MB Init Duration: 673.52 ms
ログからlambdaが2個同時に動いて、
1個めが["hoge5", "hoge1", "hoge3"]
を処理し
2個めが["hoge4", "hoge2"]
を処理したことが分かります!
メッセージ毎の処理するよ!"hogeX"
のログが5個でているので、関数record_handler
は5回動いています。
もう一度実行して、レコード毎の処理(record_handler
)がされなくなったことを確認します!
$ aws sqs send-message-batch --queue-url $SQS_URL --entries file://send-message-batch.json
2回目
START RequestId: 4e4b2e3e-6d7c-57e2-a404-77910a5ac453 Version: $LATEST
START RequestId: b3b81b2e-5c32-51ea-85ec-42813b109f19 Version: $LATEST
ハンドラーだよ!["hoge2"]
ハンドラーだよ!["hoge5"]
END RequestId: 4e4b2e3e-6d7c-57e2-a404-77910a5ac453
REPORT RequestId: 4e4b2e3e-6d7c-57e2-a404-77910a5ac453 Duration: 142.12 ms Billed Duration: 143 ms Memory Size: 128 MB Max Memory Used: 82 MB
END RequestId: b3b81b2e-5c32-51ea-85ec-42813b109f19
REPORT RequestId: b3b81b2e-5c32-51ea-85ec-42813b109f19 Duration: 150.82 ms Billed Duration: 151 ms Memory Size: 128 MB Max Memory Used: 81 MB
START RequestId: 821313b7-3553-52e9-b3c4-4a185276665c Version: $LATEST
ハンドラーだよ!["hoge4"]
END RequestId: 821313b7-3553-52e9-b3c4-4a185276665c
REPORT RequestId: 821313b7-3553-52e9-b3c4-4a185276665c Duration: 123.40 ms Billed Duration: 124 ms Memory Size: 128 MB Max Memory Used: 81 MB
START RequestId: 939c9e0e-5b21-549e-ab66-c7a13c033248 Version: $LATEST
ハンドラーだよ!["hoge1", "hoge3"]
END RequestId: 939c9e0e-5b21-549e-ab66-c7a13c033248
REPORT RequestId: 939c9e0e-5b21-549e-ab66-c7a13c033248 Duration: 872.90 ms Billed Duration: 873 ms Memory Size: 128 MB Max Memory Used: 81 MB Init Duration: 636.64 ms
メッセージ毎の処理するよ!"hogeX"
のログが一度もでていないので、関数record_handler
は動いてません!!(期待位通り!!)
ちなみにDBも見てみる
$ TABLE_NAME=$(aws dynamodb list-tables --output text --query 'TableNames[?contains(@,`CdkSqsIdempotencyLambda`)]')
$ aws dynamodb scan --table-name $TABLE_NAME
上記コマンドでも見れるのですが、ちょっと貼り付けたらわかりにくかったので、コンソールのスクショで(汗)
こんな感じ!
全部見えているところの文字が同じでなにもわからんですね、、、
idの全体を見るとこんな感じです↓
CdkSqsIdempotencyLambdaSt-CdkSqsIdempotencyLambdaL-KGWAQ24CjzMJ#381ed28c6aa0d11b9699e08a666380cf
ここまでがLambda名です→
CdkSqsIdempotencyLambdaSt-CdkSqsIdempotencyLambdaL-KGWAQ24CjzMJ
でこっちがハッシュ値ですね→381ed28c6aa0d11b9699e08a666380cf
下記なので対応するハッシュ値は"hoge3"
ですね。
$ echo -n '"hoge3"' | md5
> 381ed28c6aa0d11b9699e08a666380cf
なのでフォーマットは
Lambda名
#対応イベントのハッシュ値
です。
ここから分かるのは、フォーマットにpythonの関数名は無いということです。
つまり同じLambda内で@idempotent_function
を複数回使う場合、全く同じイベントでハッシュ値を作ってしまうと、後発の関数は動かないので注意が必要です。
以上ですー!
まとめ
非同期で動くLambdaはeventに対して1回以上動きますが、1回以上動くことあるので冪等性を実装する必要があります。
今回は代表的な非同期の動作であるSQSとLambdaの連携に、LambdaPowertoolsで冪等性を実装して動作確認してみました!
またDynamoDBに保存されたItemのidからフォーマットを調べ注意点を見つけました。