/ #cdk #Lambda Powertools 

Lambdaの冪等性をLambdaPowertoolsで簡単に実装する

もくじ

LambdaPowertoolsを使ってLambdaで冪等性を実装して、具体的な動作を確認してみます。

またLambdaPowertoolsで冪等性を実装するために、DynamoDBが必要なので、そのへんのStackはCDKで作ってみます。

具体的にはこんな感じ!

では 作っていきましょう👻

プロジェクトを用意する

このあたりは普段どおりなので、コマンドだけで流します

$ mkdir cdk_lambda_powertools_idempotency
$ cd cdk_lambda_powertools_idempotency
$ cdk init --language python
$ rm setup.py source.bat

必要なパッケージを書く

aws-cdk.core==1.91.0

aws-cdk.aws-lambda==1.91.0
aws_cdk.aws_lambda_python==1.91.0
aws_cdk.aws_dynamodb==1.91.0

Stackを用意する

from aws_cdk import core
from aws_cdk.aws_dynamodb import Attribute, AttributeType, BillingMode, Table
from aws_cdk.aws_lambda import Runtime
from aws_cdk.aws_lambda_python import PythonFunction

APP_NAME = "CdkLambdaPowertoolsIdempotency"


class CdkLambdaPowertoolsIdempotencyStack(core.Stack):
    def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        table = Table(
            self,
            f"{APP_NAME}IdempotencyStore",
            partition_key=Attribute(name="id", type=AttributeType.STRING),
            time_to_live_attribute="expiration",
            billing_mode=BillingMode.PAY_PER_REQUEST,
            removal_policy=core.RemovalPolicy.DESTROY,
        )

        lambda_ = PythonFunction(
            self,
            f"{APP_NAME}Lambda",
            entry="lambda_app",
            runtime=Runtime.PYTHON_3_8,
            environment={"IDEMPOTENCY_STORE_TABLE_NAME": table.table_name},
        )

        table.grant_read_write_data(lambda_)

Tableのpkとttlは、LambdaPowertoolsがデフォルトに合わせています。このへんは変数で変えることができますが一旦このまま進みます。

Lambdaを実装する

Lambdaのパッケージ一覧を作成

aws-lambda-powertools

アプリ本体を一旦、下記のように実装してみる

import json
import os
import time

from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent

EXPIRES_AFTER_SECONDS = 3 * 60  # 冪等性の期限[秒]

persistence_layer = DynamoDBPersistenceLayer(table_name=os.getenv("IDEMPOTENCY_STORE_TABLE_NAME"))

config = IdempotencyConfig(expires_after_seconds=EXPIRES_AFTER_SECONDS)

@idempotent(persistence_store=persistence_layer, config=config)
def handler(event, context):

    print(f"Lambdaが動いたー!!!{json.dumps(event)}")

    return {
        "hoge_id": event["hoge_id"],
        "message": "success",
        "statusCode": 200,
    }

handlerが実行されると"Lambdaが動いたー!!!とプリントされる感じです!

このhandlerで確認したいことは、Lambdaのeventが同じだった場合、handler の内部は1回しか処理されないけれども出力はすべて一緒、ということです。

デプロイしてAWSで確認するのも手間なので、ここはローカルで確認してみます。

ローカルで冪等性を確認してみる

パッケージ一覧にテスト向けにパッケージ追加

...
aws_lambda_powertools
pytest
boto3
moto

テストを書いてみる

import json
import os
from decimal import Decimal

import boto3
import pytest
from moto import mock_dynamodb2


@pytest.fixture(scope="session", autouse=True)
def environ(request):
    os.environ["IDEMPOTENCY_STORE_TABLE_NAME"] = "hogehoge"


@pytest.fixture()
def mock_dynamodb_table(monkeypatch):
    mock_dynamodb2().start()
    mock_table_name = os.getenv("IDEMPOTENCY_STORE_TABLE_NAME")

    dynamodb = boto3.resource("dynamodb")
    dynamodb.create_table(
        TableName=mock_table_name,
        KeySchema=[{"AttributeName": "id", "KeyType": "HASH"},],
        AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"},],
        ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
    )

    yield dynamodb.Table(mock_table_name)

    mock_dynamodb2().stop()


def test_idempotency(mock_dynamodb_table):
    import lambda_app.index as app

	print("同じイベントで重複起動させても1度しか処理されず、レスポンスはすべて同じであることを確認する")
	event = {"hoge_id": 1}
	response = app.handler(event, {})
	for _ in range(10):
		assert response == app.handler(event, {})

	print("イベントが違うと処理されることを確認する")
	app.handler({"hoge_id": 2}, {})
	app.handler({"hoge_id": 1, "fuga_id": 1}, {})
	items = mock_dynamodb_table.scan()["Items"]

	print("テーブルのscan結果")
	print(json.dumps(items, indent=2, default=lambda x: float(x) if isinstance(x, Decimal) else x,))

こんな感じ。

やってることはダミーな環境変数作ってDynamoDBのモックのテーブルを作って、そこでLambdaを実行している感じです。

目論見は、ここでprintがどんな感じで出力されるか観察してみることです。

では実行してみます。

$ pytest -s

# こんな感じで出力されました(↓)
同じイベントで重複起動させても1度しか処理されず、レスポンスはすべて同じであることを確認する
Lambdaが動いたー!!!{"hoge_id": 1}
イベントが違うと処理されることを確認する
Lambdaが動いたー!!!{"hoge_id": 2}
Lambdaが動いたー!!!{"hoge_id": 1, "fuga_id": 1}
テーブルのscan結果
[
  {
    "id": "test-func#64489a27721931340931efd8dce1296d",
    "expiration": 1630735129.0,
    "status": "COMPLETED",
    "data": "{\"hoge_id\": 1, \"message\": \"success\", \"statusCode\": 200}"
  },
  {
    "id": "test-func#d63445d37a16e5db046c1a8fc535afaa",
    "expiration": 1630735129.0,
    "status": "COMPLETED",
    "data": "{\"hoge_id\": 2, \"message\": \"success\", \"statusCode\": 200}"
  },
  {
    "id": "test-func#25d39be1a454519a4bb58c7b894ed88e",
    "expiration": 1630735129.0,
    "status": "COMPLETED",
    "data": "{\"hoge_id\": 1, \"message\": \"success\", \"statusCode\": 200}"
  }
]

出力を見るとまず{"hoge_id": 1}ですが、10回くらいテストで実行していますが、printは1回なのでhandlerのロジックが使われたのが1回であることが分かります。

またこのときの出力(response)もテストからすべて同じことが確認できました!!

次に異なるeventに対しては、それぞれprintが実行されているので、handlerのロジックが実行されていることが確認できます。

次にテーブルのscan結果を見てみます

  • id: pkです。あとで詳しく見ます
  • expiration: TTLですね
  • status: 関数の処理状況です
  • data: Lambdaのレスポンスです

idをもっと詳しく見てみますと、

Lambdaの関数名#eventのハッシュ値

というフォーマットになってます。

ちなみにtest-funcはLambdaの関数名が空の場合にデフォルト値としてこの名前になるみたいです(ソースコードを読んだ限りだと)

eventのハッシュ値はデフォルトはmd5です。(configを変えれば変更可能)

なので例えば{"hoge_id": 1}のハッシュ値は下記なので、、

$ echo -n '{"hoge_id": 1}' | md5
64489a27721931340931efd8dce1296d

該当するDynamoのItemは下記ということが分かります

  {
    "id": "test-func#64489a27721931340931efd8dce1296d",
    "expiration": 1630735129.0,
    "status": "COMPLETED",
    "data": "{\"hoge_id\": 1, \"message\": \"success\", \"statusCode\": 200}"
  }

ローカルでの動作確認2

上記の確認で全く同じeventに対しては、handler内のロジックが1回しか実行されないことが分かりました。

でもそれって範囲が広すぎて、実際にはeventのこの値にたいして冪等な挙動をしてほしい!ということがあると思います。

今回のテストだとhoge_idのみに対して冪等な挙動をしてほしい!というケースですね。

それはhandler側のconfigにevent_key_jmespathの引数にJMESPath形式の文字列を入れると指定できます

こんな感じ↓

...
# config = IdempotencyConfig(expires_after_seconds=EXPIRES_AFTER_SECONDS) # コメントアウト
config = IdempotencyConfig(event_key_jmespath="hoge_id", expires_after_seconds=EXPIRES_AFTER_SECONDS) # ←追記
...

ここでテスト実行してみる

$pytest -s

同じイベントで重複起動させても1度しか処理されず、レスポンスはすべて同じであることを確認する
Lambdaが動いたー!!!{"hoge_id": 1}
イベントが違うと処理されることを確認する
Lambdaが動いたー!!!{"hoge_id": 2}
テーブルのscan結果
[
  {
    "id": "test-func#c4ca4238a0b923820dcc509a6f75849b",
    "expiration": 1630736851.0,
    "status": "COMPLETED",
    "data": "{\"hoge_id\": 1, \"message\": \"success\", \"statusCode\": 200}"
  },
  {
    "id": "test-func#c81e728d9d4c2f636f067f89cc14862c",
    "expiration": 1630736851.0,
    "status": "COMPLETED",
    "data": "{\"hoge_id\": 2, \"message\": \"success\", \"statusCode\": 200}"
  }
]

前回の実行結果と比べるとLambdaが動いたー!!!{"hoge_id": 1, "fuga_id": 1}の出力がなくなったのが確認できました!!

Lambdaが失敗したときの挙動は?

公式サイトを読むとItemが削除されると書いてありますが、確認してみます!

https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/idempotency/#handling-exceptions

handlerのロジックを下記のように変えてみます

...
def handler(event, context):

    print(f"Lambdaが動いたー!!!{json.dumps(event)}")
	raise

テストも変えてみる

...
def test_idempotency(mock_dynamodb_table):
    import lambda_app.index as app

    try:
        event = {"hoge_id": 1}
        response = app.handler(event, {})
    except Exception:
        pass
    finally:
        print("テーブルのscan結果")
        items = mock_dynamodb_table.scan()["Items"]
        print(
            json.dumps(
                items, indent=2, default=lambda x: float(x) if isinstance(x, Decimal) else x,
            )
        )

実行してみる

$ pytest -s
Lambdaが動いたー!!!{"hoge_id": 1}
テーブルのscan結果
[]

はい。DynamoDBにItemがないことが確認できました。

AWSの環境でLambdaの処理中に別のLambdaが走ったときの挙動は?

これ気になりますよね

方針としてはLambdaのhandlerの中にsleep関数を埋め込んで無理やり処理を長くして、その間に同じeventをもう一度叩きます

まずLambdaデフォルトのtimeoutが3秒なので、これを長く(30秒くらい)します。

Stackをこんな感じに修正します

...
        lambda_ = PythonFunction(
            self,
            f"{APP_NAME}Lambda",
            entry="lambda_app",
            runtime=Runtime.PYTHON_3_8,
            environment={"IDEMPOTENCY_STORE_TABLE_NAME": table.table_name},
            timeout=core.Duration.seconds(30), # 追加
        )
...

handlerも修正

...
import time
...
def handler(event, context):

    print(f"Lambdaが動いたー!!!{json.dumps(event)}")
	time.sleep(20) # 20秒sleep
...

デプロイ

$ cdk deploy

Lambdaを実行してみる

# デプロイしたLambdaを探す
# 今回は名前が長すぎて`cdk-lambda-powertools-ide`で切れていた
$ aws lambda list-functions --output table --query 'Functions[?contains(FunctionName,`cdk-lambda-powertools-ide`)]'

# Lambda名を変数にいれる
$FUNCTION_NAME=$(aws lambda list-functions --output text --query 'Functions[?contains(FunctionName,`cdk-lambda-powertools-ide`)].FunctionName')

# Lambda実行①
$ aws lambda invoke --function-name $FUNCTION_NAME --payload $(echo '{"hoge_id":1}' | base64) response.json
{
    "StatusCode": 200,
    "ExecutedVersion": "$LATEST"
}

# Lambda実行② (別のターミナルで①の処理中に)
$ aws lambda invoke --function-name $FUNCTION_NAME --payload $(echo '{"hoge_id":1}' | base64) response.json
>{
    "StatusCode": 200,
    "FunctionError": "Unhandled",
    "ExecutedVersion": "$LATEST"
}

①の処理中に②を走らせると、②側のレスポンスは即座に返ってきます。

なので、同時実行させると、後発はエラーが返ってくるというのが答えでした!

ついでに、処理中のDynamoDBのテーブルの様子

始めは空っぽ

①が処理中 (この状態で同じeventでLambdaが実行されると"FunctionError": "Unhandled"になる)

①処理完了

Lambdaタイムアウトしたらどうなるの?

確認方法はcdkのLambdaのtimeoutを3秒くらいにしてLambdaを実行すればすぐに確認できます。

結果から言うとDynamoのItemがINPROGRESSで停止します。レコードの削除もされません(TTLでは削除される)

これは注意しないとだめです。。。

Lambdaのtimeoutは気をつけましょう、、、

まとめ

LambdaPowertoolsを使ってLambdaの冪等性を実装して動作確認しました!

同じでeventに対して、handlerのロジックは1度しか実行されないこと、レスポンスは同じことを確認しました。

Lambdaで例外が発生した場合はテーブルからItemが削除されることを確認しました。

Lambdaが処理中に同じeventを受信した場合、後発のLambdaにはエラーが返されることを確認しました。

Lambdaがtimeoutした場合はテーブルのItemのステータスが進行中で止まってしまうため注意が必要であることを確認しました。

今回のリポジトリはこちら

https://github.com/sisi100/cdk_lambda_powertools_idempotency

Author

Sisii

インフラが好きなエンジニアぶってるなにか