/ #Lambda Powertools #Lambda 

【Lambda Powertools】パーテーションキーとソートキーが設定されたテーブルで冪等性を実装する

もくじ

LambdaPowertoolsの冪等性パート2です。

何がしたいのか?というとLambdaPowertoolsで冪等性を実装したい!そしてテーブルはできたら既存のやつを使いたい!

そしてその既存のテーブルはパーテーションキーとソートキーとTTLが既に設定さている。

という要件のときに実装したらどうなるのかしら?を調査的に実装してみます。

結論ですが、あまり有用な記事ではありません。

ぶっちゃけ既存テーブルを使わないで、新しくテーブル作るべきです。

では 作っていきましょう👻

今回は以前の記事をフォークして作ります。

なのでリポジトリの詳細はそちらをご覧いただければ幸いです!


まずLambdaPowertoolsはドキュメントやコードを読む限りソートキーに対応してません。

そこで実装方針ですがBasePersistenceLayerを拡張して要件にあうものを自作します。

具体的にはこんな感じ↓

import datetime
from typing import Any, Dict, Optional

import boto3
from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer
from aws_lambda_powertools.utilities.idempotency.exceptions import (
    IdempotencyItemAlreadyExistsError,
    IdempotencyItemNotFoundError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord
from botocore.config import Config


class CustomPersistenceLayer(BasePersistenceLayer):
    def __init__(
        self,
        table_name: str,
        key_attr: str = "id",
        sort_key_attr: str = "sk",  # ソートキーの名前
        sort_key_value: str = "dummy",  # ソートキーの値は定数にしたのでその値
        expiry_attr: str = "expiration",
        status_attr: str = "status",
        data_attr: str = "data",
        validation_key_attr: str = "validation",
        boto_config: Optional[Config] = None,
        boto3_session: Optional[boto3.session.Session] = None,
    ):
        boto_config = boto_config or Config()
        session = boto3_session or boto3.session.Session()
        self._ddb_resource = session.resource("dynamodb", config=boto_config)
        self.table_name = table_name
        self.table = self._ddb_resource.Table(self.table_name)
        self.key_attr = key_attr
        self.sort_key_attr = sort_key_attr
        self.sort_key_value = sort_key_value
        self.expiry_attr = expiry_attr
        self.status_attr = status_attr
        self.data_attr = data_attr
        self.validation_key_attr = validation_key_attr
        super(CustomPersistenceLayer, self).__init__()

    def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
        return DataRecord(
            idempotency_key=item[self.key_attr],
            status=item[self.status_attr],
            expiry_timestamp=item[self.expiry_attr],
            response_data=item.get(self.data_attr),
            payload_hash=item.get(self.validation_key_attr),
        )

    def _get_record(self, idempotency_key) -> DataRecord:
        response = self.table.get_item(
            Key={self.key_attr: idempotency_key, self.sort_key_attr: self.sort_key_value},  # 条件にソートキーを含めた
            ConsistentRead=True,
        )

        try:
            item = response["Item"]
        except KeyError:
            raise IdempotencyItemNotFoundError
        return self._item_to_data_record(item)

    def _put_record(self, data_record: DataRecord) -> None:
        item = {
            self.key_attr: data_record.idempotency_key,
            self.sort_key_attr: self.sort_key_value,  # 条件にソートキーを含めた
            self.expiry_attr: data_record.expiry_timestamp,
            self.status_attr: data_record.status,
        }

        if self.payload_validation_enabled:
            item[self.validation_key_attr] = data_record.payload_hash

        now = datetime.datetime.now()
        try:
            self.table.put_item(
                Item=item,
                ConditionExpression=f"attribute_not_exists({self.key_attr}) AND attribute_not_exists({self.sort_key_attr}) OR {self.expiry_attr} < :now",  # 条件にソートキーを含めた
                ExpressionAttributeValues={":now": int(now.timestamp())},
            )
        except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException:
            raise IdempotencyItemAlreadyExistsError

    def _update_record(self, data_record: DataRecord):
        update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
        expression_attr_values = {
            ":expiry": data_record.expiry_timestamp,
            ":response_data": data_record.response_data,
            ":status": data_record.status,
        }
        expression_attr_names = {
            "#response_data": self.data_attr,
            "#expiry": self.expiry_attr,
            "#status": self.status_attr,
        }

        if self.payload_validation_enabled:
            update_expression += ", #validation_key = :validation_key"
            expression_attr_values[":validation_key"] = data_record.payload_hash
            expression_attr_names["#validation_key"] = self.validation_key_attr

        kwargs = {
            "Key": {
                self.key_attr: data_record.idempotency_key,
                self.sort_key_attr: self.sort_key_value,  # 条件にソートキーを含めた
            },
            "UpdateExpression": update_expression,
            "ExpressionAttributeValues": expression_attr_values,
            "ExpressionAttributeNames": expression_attr_names,
        }

        self.table.update_item(**kwargs)

    def _delete_record(self, data_record: DataRecord) -> None:
        self.table.delete_item(
            Key={self.key_attr: data_record.idempotency_key, self.sort_key_attr: self.sort_key_value},  # 条件にソートキーを含めた
        )

何やら長そうですがやっていることは_get_record,_put_record,_update_record,_delete_recordにboto3使ってdynamoに保存処理やらを書いているだけです。

まぁ、、、上のコードの大半は公式ドキュメントのサンプルをそのまんま使っているので偉そうに言えませんが(笑)

はい。

後はこれをhandlerにこんな感じで付与してあげます。

import os

from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent
from custom_persistence_layer import CustomPersistenceLayer

EXPIRES_AFTER_SECONDS = 3 * 60  # 冪等性の期限[秒]

persistence_layer = CustomPersistenceLayer(table_name=os.getenv("IDEMPOTENCY_STORE_TABLE_NAME"))

config = IdempotencyConfig(expires_after_seconds=EXPIRES_AFTER_SECONDS)


@idempotent(persistence_store=persistence_layer, config=config)
def handler(event, context):
    # ...略

最後にStackを意地悪な感じで書き換えます。

# ...略
class CdkLambdaPowertoolsIdempotencyStack(core.Stack):
    def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
        # ...略
        table = Table(
            self,
            f"{APP_NAME}IdempotencyStore",
            partition_key=Attribute(name="id", type=AttributeType.STRING),
            sort_key=Attribute(name="sk", type=AttributeType.STRING),  # 不要なソートキーを意地悪で仕込む
            time_to_live_attribute="expiration",
            billing_mode=BillingMode.PAY_PER_REQUEST,
            removal_policy=core.RemovalPolicy.DESTROY,
        )
        # ...略

そしてdeploy!

動作確認

Lambdaを叩いてDynamoDBの中を覗いてみます。

# 関数名取得
$FUNCTION_NAME=$(aws lambda list-functions --output text --query 'Functions[?contains(FunctionName,`cdk-lambda-powertools-ide`)].FunctionName')

# aws-cli v2の場合
$ aws lambda invoke --function-name $FUNCTION_NAME --payload $(echo '{"hoge_id":1}' | base64) response.json
>{
>    "StatusCode": 200,
>    "ExecutedVersion": "$LATEST"
>}

# aws-cli v1の場合
$ aws lambda invoke --function-name $FUNCTION_NAME --payload '{"hoge_id":1}' response.json
>{
>    "StatusCode": 200,
>    "ExecutedVersion": "$LATEST"
>}

テーブル側の確認

確認にはNoSQL Workbenchを使ってみます。

ちゃんとsk付きで登録されることが確認できました!

とはいえ現状ではskがなんの役にも立たない状態です。

pkとskを分けたことに少しでも意味をもたせるなら、pk側に定数を入れて、今idに入っている値をskに入れて、

クエリー時にpkの定数で検索すると、対象Lambdaに対して冪等処理に使ったレコード抽出できる!的に使うべきですかね。(使う機会あるのか?)

まとめとか思ったこと

とりあえず期待通り動きましたが、CustomPersistenceLayerのコードを保守していくよりもテーブル新しく作った方が全体的にコストが低そうだなーと感じました。

とは言え、いざというときのために実装方法はある。ということが分かったので満足でした。

以上

今回のリポジトリはこちら

https://github.com/sisi100/cdk_lambda_powertools_idempotency_sort_key

Author

Sisii

インフラが好きなエンジニアぶってるなにか