【Lambda Powertools】パーテーションキーとソートキーが設定されたテーブルで冪等性を実装する
LambdaPowertoolsの冪等性パート2です。
何がしたいのか?というとLambdaPowertools
で冪等性を実装したい!そしてテーブルはできたら既存のやつを使いたい!
そしてその既存のテーブルはパーテーションキーとソートキーとTTLが既に設定さている。
という要件のときに実装したらどうなるのかしら?を調査的に実装してみます。
結論ですが、あまり有用な記事ではありません。
ぶっちゃけ既存テーブルを使わないで、新しくテーブル作るべきです。
では 作っていきましょう👻
今回は以前の記事をフォークして作ります。
なのでリポジトリの詳細はそちらをご覧いただければ幸いです!
まずLambdaPowertools
はドキュメントやコードを読む限りソートキーに対応してません。
そこで実装方針ですがBasePersistenceLayer
を拡張して要件にあうものを自作します。
具体的にはこんな感じ↓
import datetime
from typing import Any, Dict, Optional
import boto3
from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord
from botocore.config import Config
class CustomPersistenceLayer(BasePersistenceLayer):
def __init__(
self,
table_name: str,
key_attr: str = "id",
sort_key_attr: str = "sk", # ソートキーの名前
sort_key_value: str = "dummy", # ソートキーの値は定数にしたのでその値
expiry_attr: str = "expiration",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
boto_config: Optional[Config] = None,
boto3_session: Optional[boto3.session.Session] = None,
):
boto_config = boto_config or Config()
session = boto3_session or boto3.session.Session()
self._ddb_resource = session.resource("dynamodb", config=boto_config)
self.table_name = table_name
self.table = self._ddb_resource.Table(self.table_name)
self.key_attr = key_attr
self.sort_key_attr = sort_key_attr
self.sort_key_value = sort_key_value
self.expiry_attr = expiry_attr
self.status_attr = status_attr
self.data_attr = data_attr
self.validation_key_attr = validation_key_attr
super(CustomPersistenceLayer, self).__init__()
def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
return DataRecord(
idempotency_key=item[self.key_attr],
status=item[self.status_attr],
expiry_timestamp=item[self.expiry_attr],
response_data=item.get(self.data_attr),
payload_hash=item.get(self.validation_key_attr),
)
def _get_record(self, idempotency_key) -> DataRecord:
response = self.table.get_item(
Key={self.key_attr: idempotency_key, self.sort_key_attr: self.sort_key_value}, # 条件にソートキーを含めた
ConsistentRead=True,
)
try:
item = response["Item"]
except KeyError:
raise IdempotencyItemNotFoundError
return self._item_to_data_record(item)
def _put_record(self, data_record: DataRecord) -> None:
item = {
self.key_attr: data_record.idempotency_key,
self.sort_key_attr: self.sort_key_value, # 条件にソートキーを含めた
self.expiry_attr: data_record.expiry_timestamp,
self.status_attr: data_record.status,
}
if self.payload_validation_enabled:
item[self.validation_key_attr] = data_record.payload_hash
now = datetime.datetime.now()
try:
self.table.put_item(
Item=item,
ConditionExpression=f"attribute_not_exists({self.key_attr}) AND attribute_not_exists({self.sort_key_attr}) OR {self.expiry_attr} < :now", # 条件にソートキーを含めた
ExpressionAttributeValues={":now": int(now.timestamp())},
)
except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException:
raise IdempotencyItemAlreadyExistsError
def _update_record(self, data_record: DataRecord):
update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
expression_attr_values = {
":expiry": data_record.expiry_timestamp,
":response_data": data_record.response_data,
":status": data_record.status,
}
expression_attr_names = {
"#response_data": self.data_attr,
"#expiry": self.expiry_attr,
"#status": self.status_attr,
}
if self.payload_validation_enabled:
update_expression += ", #validation_key = :validation_key"
expression_attr_values[":validation_key"] = data_record.payload_hash
expression_attr_names["#validation_key"] = self.validation_key_attr
kwargs = {
"Key": {
self.key_attr: data_record.idempotency_key,
self.sort_key_attr: self.sort_key_value, # 条件にソートキーを含めた
},
"UpdateExpression": update_expression,
"ExpressionAttributeValues": expression_attr_values,
"ExpressionAttributeNames": expression_attr_names,
}
self.table.update_item(**kwargs)
def _delete_record(self, data_record: DataRecord) -> None:
self.table.delete_item(
Key={self.key_attr: data_record.idempotency_key, self.sort_key_attr: self.sort_key_value}, # 条件にソートキーを含めた
)
何やら長そうですがやっていることは_get_record
,_put_record
,_update_record
,_delete_record
にboto3使ってdynamoに保存処理やらを書いているだけです。
まぁ、、、上のコードの大半は公式ドキュメントのサンプルをそのまんま使っているので偉そうに言えませんが(笑)
はい。
後はこれをhandlerにこんな感じで付与してあげます。
import os
from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent
from custom_persistence_layer import CustomPersistenceLayer
EXPIRES_AFTER_SECONDS = 3 * 60 # 冪等性の期限[秒]
persistence_layer = CustomPersistenceLayer(table_name=os.getenv("IDEMPOTENCY_STORE_TABLE_NAME"))
config = IdempotencyConfig(expires_after_seconds=EXPIRES_AFTER_SECONDS)
@idempotent(persistence_store=persistence_layer, config=config)
def handler(event, context):
# ...略
最後にStackを意地悪な感じで書き換えます。
# ...略
class CdkLambdaPowertoolsIdempotencyStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
# ...略
table = Table(
self,
f"{APP_NAME}IdempotencyStore",
partition_key=Attribute(name="id", type=AttributeType.STRING),
sort_key=Attribute(name="sk", type=AttributeType.STRING), # 不要なソートキーを意地悪で仕込む
time_to_live_attribute="expiration",
billing_mode=BillingMode.PAY_PER_REQUEST,
removal_policy=core.RemovalPolicy.DESTROY,
)
# ...略
そしてdeploy!
動作確認
Lambdaを叩いてDynamoDBの中を覗いてみます。
# 関数名取得
$FUNCTION_NAME=$(aws lambda list-functions --output text --query 'Functions[?contains(FunctionName,`cdk-lambda-powertools-ide`)].FunctionName')
# aws-cli v2の場合
$ aws lambda invoke --function-name $FUNCTION_NAME --payload $(echo '{"hoge_id":1}' | base64) response.json
>{
> "StatusCode": 200,
> "ExecutedVersion": "$LATEST"
>}
# aws-cli v1の場合
$ aws lambda invoke --function-name $FUNCTION_NAME --payload '{"hoge_id":1}' response.json
>{
> "StatusCode": 200,
> "ExecutedVersion": "$LATEST"
>}
テーブル側の確認
確認にはNoSQL Workbench
を使ってみます。
ちゃんとsk付きで登録されることが確認できました!
とはいえ現状ではskがなんの役にも立たない状態です。
pkとskを分けたことに少しでも意味をもたせるなら、pk側に定数を入れて、今idに入っている値をskに入れて、
クエリー時にpkの定数で検索すると、対象Lambdaに対して冪等処理に使ったレコード抽出できる!的に使うべきですかね。(使う機会あるのか?)
まとめとか思ったこと
とりあえず期待通り動きましたが、CustomPersistenceLayer
のコードを保守していくよりもテーブル新しく作った方が全体的にコストが低そうだなーと感じました。
とは言え、いざというときのために実装方法はある。ということが分かったので満足でした。
以上
今回のリポジトリはこちら
https://github.com/sisi100/cdk_lambda_powertools_idempotency_sort_key