/ #Lambda Powertools #sqs 

SQSと連携したLambdaの多重起動対策をLambdaPowertoolsでやる

もくじ

SQSはLambdaと非同期に接続します。

このときの注意点ですがLambdaが同じメッセージに対して重複して処理する可能性があるというとこです。

今回はこれをLambdaPowertoolsで防止しようぜ!って記事です。

ちなみに前回、前々回の

LambdaPowertoolsでSQSのバッチ処理を楽にやろうLambdaの冪等性をLambdaPowertoolsで簡単に実装するの記事は全てこの記事のための布石です!!

では 作っていきましょう👻

プロジェクトを用意する

いつもの!

$ mkdir cdk_sqs_idempotency_lambda
$ cd cdk_sqs_idempotency_lambda
$ cdk init --language python
$ rm setup.py source.bat

# Lambdaのディレクトリも作っておく
$ mkdir lambda_app

依存パッケージ

aws-cdk.core==1.121.0

aws-cdk.aws-lambda==1.121.0
aws_cdk.aws_lambda_python==1.121.0
aws_cdk.aws_lambda_event_sources==1.121.0
aws_cdk.aws_sqs==1.121.0
aws_cdk.aws_dynamodb==1.121.0

インストール

$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install -r requirements.txt

Stackを用意する

DynamoDBにテーブルを作ってLambdaに読み書きできるよう権限を付与して、SQSのキューを作ってLambdaのトリガーに設定する感じです!

from aws_cdk import core
from aws_cdk.aws_dynamodb import Attribute, AttributeType, BillingMode, Table
from aws_cdk.aws_lambda import Runtime
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from aws_cdk.aws_lambda_python import PythonFunction
from aws_cdk.aws_sqs import Queue

APP_NAME = "CdkSqsIdempotencyLambda"


class CdkSqsIdempotencyLambdaStack(core.Stack):
    def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        table = Table(
            self,
            f"{APP_NAME}IdempotencyStore",
            partition_key=Attribute(name="id", type=AttributeType.STRING),
            time_to_live_attribute="expiration",
            removal_policy=core.RemovalPolicy.DESTROY,
        )

        queue = Queue(self, f"{APP_NAME}Queue")

        lambda_ = PythonFunction(
            self,
            f"{APP_NAME}Lambda",
            entry="lambda_app",
            runtime=Runtime.PYTHON_3_8,
            environment={"IDEMPOTENCY_STORE_TABLE_NAME": table.table_name},
        )

        lambda_.add_event_source(SqsEventSource(queue))

        table.grant_read_write_data(lambda_)

アプリを書く

Lambdaのパッケージ一覧を作成

aws-lambda-powertools

アプリ本体は下記のように実装してみます

import json
import os

from aws_lambda_powertools.utilities.batch import sqs_batch_processor
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)

persistence_layer = DynamoDBPersistenceLayer(table_name=os.getenv("IDEMPOTENCY_STORE_TABLE_NAME"))

config = IdempotencyConfig(event_key_jmespath="body")


@idempotent_function(data_keyword_argument="record", config=config, persistence_store=persistence_layer)
def record_handler(record):
    print(f"メッセージ毎の処理するよ!{json.dumps(record['body'])}")
    return record


@sqs_batch_processor(record_handler=record_handler)
def handler(event, context):
    print(f"ハンドラーだよ!{json.dumps([x['body'] for x in event['Records']])}")
    return {"statusCode": 200}

ほぼほぼ前回と前々回のコードをくっつけただけですね笑

違いは@idempotent_functionこの部分で、前回は@idempotentでした。

使い分けは@idempotentはLambdaハンドラーにしか使えず、それ以外なら@idempotent_functionという感じです。

また@idempotent_functionを使う場合はdata_keyword_argumentで引数を教えてあげる必要があります。

また今回はrecordにあるbodyに対して冪等な処理を行いたいので、IdempotencyConfig(event_key_jmespath="body")としてます。

デプロイ

下記のコマンドでデプロイ!

cdk deploy

動作確認

CloudWatchのログをtailしておく。

$ FUNCTION_NAME=$(aws lambda list-functions --output text --query 'Functions[?contains(FunctionName,`CdkSqsIdempotencyLambda`)].FunctionName')
$ aws logs tail --follow /aws/lambda/$FUNCTION_NAME

Lambdaが一回も実行されていないと、ロググループが作らえないのでコマンドを叩くのはLambdaを実行した後です。

メッセージを送ってみる

前回同様、下記のようなファイルをまずつくります

[
  {
    "Id": "1",
    "MessageBody": "hoge1"
  },
  {
    "Id": "2",
    "MessageBody": "hoge2"
  },
  {
    "Id": "3",
    "MessageBody": "hoge3"
  },
  {
    "Id": "4",
    "MessageBody": "hoge4"
  },
  {
    "Id": "5",
    "MessageBody": "hoge5"
  }
]

次に下記コマンドを叩いてメッセージを5こ積みます。

$ SQS_URL=$(aws sqs list-queues --query 'QueueUrls[?contains(@, `CdkSqsIdempotencyLambdaStack`)]' --output text)
$ aws sqs send-message-batch --queue-url $SQS_URL --entries file://send-message-batch.json

ここでログのコマンドを叩いて動作を確認します。

# 1回目

START RequestId: 96b63c3d-78ee-5301-99cd-65921e09de38 Version: $LATEST
START RequestId: 3679d4fd-f381-5dad-9b4b-8d5295733918 Version: $LATEST
メッセージ毎の処理するよ!"hoge5"
メッセージ毎の処理するよ!"hoge1"
メッセージ毎の処理するよ!"hoge4"
メッセージ毎の処理するよ!"hoge3"
ハンドラーだよ!["hoge5", "hoge1", "hoge3"]
メッセージ毎の処理するよ!"hoge2"
END RequestId: 96b63c3d-78ee-5301-99cd-65921e09de38
REPORT RequestId: 96b63c3d-78ee-5301-99cd-65921e09de38	Duration: 906.68 ms	Billed Duration: 907 ms	Memory Size: 128 MB	Max Memory Used: 81 MB	Init Duration: 635.18 ms
ハンドラーだよ!["hoge4", "hoge2"]
END RequestId: 3679d4fd-f381-5dad-9b4b-8d5295733918
REPORT RequestId: 3679d4fd-f381-5dad-9b4b-8d5295733918	Duration: 851.40 ms	Billed Duration: 852 ms	Memory Size: 128 MB	Max Memory Used: 81 MB	Init Duration: 673.52 ms

ログからlambdaが2個同時に動いて、

1個めが["hoge5", "hoge1", "hoge3"]を処理し

2個めが["hoge4", "hoge2"]を処理したことが分かります!

メッセージ毎の処理するよ!"hogeX"のログが5個でているので、関数record_handlerは5回動いています。

もう一度実行して、レコード毎の処理(record_handler)がされなくなったことを確認します!

$ aws sqs send-message-batch --queue-url $SQS_URL --entries file://send-message-batch.json

2回目

START RequestId: 4e4b2e3e-6d7c-57e2-a404-77910a5ac453 Version: $LATEST
START RequestId: b3b81b2e-5c32-51ea-85ec-42813b109f19 Version: $LATEST
ハンドラーだよ!["hoge2"]
ハンドラーだよ!["hoge5"]
END RequestId: 4e4b2e3e-6d7c-57e2-a404-77910a5ac453
REPORT RequestId: 4e4b2e3e-6d7c-57e2-a404-77910a5ac453	Duration: 142.12 ms	Billed Duration: 143 ms	Memory Size: 128 MB	Max Memory Used: 82 MB
END RequestId: b3b81b2e-5c32-51ea-85ec-42813b109f19
REPORT RequestId: b3b81b2e-5c32-51ea-85ec-42813b109f19	Duration: 150.82 ms	Billed Duration: 151 ms	Memory Size: 128 MB	Max Memory Used: 81 MB
START RequestId: 821313b7-3553-52e9-b3c4-4a185276665c Version: $LATEST
ハンドラーだよ!["hoge4"]
END RequestId: 821313b7-3553-52e9-b3c4-4a185276665c
REPORT RequestId: 821313b7-3553-52e9-b3c4-4a185276665c	Duration: 123.40 ms	Billed Duration: 124 ms	Memory Size: 128 MB	Max Memory Used: 81 MB
START RequestId: 939c9e0e-5b21-549e-ab66-c7a13c033248 Version: $LATEST
ハンドラーだよ!["hoge1", "hoge3"]
END RequestId: 939c9e0e-5b21-549e-ab66-c7a13c033248
REPORT RequestId: 939c9e0e-5b21-549e-ab66-c7a13c033248	Duration: 872.90 ms	Billed Duration: 873 ms	Memory Size: 128 MB	Max Memory Used: 81 MB	Init Duration: 636.64 ms

メッセージ毎の処理するよ!"hogeX"のログが一度もでていないので、関数record_handlerは動いてません!!(期待位通り!!)

ちなみにDBも見てみる

$ TABLE_NAME=$(aws dynamodb list-tables --output text --query 'TableNames[?contains(@,`CdkSqsIdempotencyLambda`)]')
$ aws dynamodb scan --table-name $TABLE_NAME

上記コマンドでも見れるのですが、ちょっと貼り付けたらわかりにくかったので、コンソールのスクショで(汗)

こんな感じ!

全部見えているところの文字が同じでなにもわからんですね、、、

idの全体を見るとこんな感じです↓

CdkSqsIdempotencyLambdaSt-CdkSqsIdempotencyLambdaL-KGWAQ24CjzMJ#381ed28c6aa0d11b9699e08a666380cf

ここまでがLambda名です→ CdkSqsIdempotencyLambdaSt-CdkSqsIdempotencyLambdaL-KGWAQ24CjzMJ

でこっちがハッシュ値ですね→381ed28c6aa0d11b9699e08a666380cf

下記なので対応するハッシュ値は"hoge3"ですね。

$ echo -n '"hoge3"' | md5
> 381ed28c6aa0d11b9699e08a666380cf

なのでフォーマットは

Lambda名#対応イベントのハッシュ値

です。

ここから分かるのは、フォーマットにpythonの関数名は無いということです。

つまり同じLambda内で@idempotent_functionを複数回使う場合、全く同じイベントでハッシュ値を作ってしまうと、後発の関数は動かないので注意が必要です。

以上ですー!

まとめ

非同期で動くLambdaはeventに対して1回以上動きますが、1回以上動くことあるので冪等性を実装する必要があります。

今回は代表的な非同期の動作であるSQSとLambdaの連携に、LambdaPowertoolsで冪等性を実装して動作確認してみました!

またDynamoDBに保存されたItemのidからフォーマットを調べ注意点を見つけました。

今回のリポジトリはこちら

https://github.com/sisi100/cdk_sqs_idempotency_lambda

Author

Sisii

インフラが好きなエンジニアぶってるなにか