【CDK/Python】DynamoDB StreamのTTLで削除されるやつをS3へアーカイブする
DynamoDB内から今後アクセスされる機会のないアイテムはどんどん削除したい派の私です。
ですが要件としてはなにかに使うかもしれないから消すのはもったいない!
とうことも当然あると思いますので、今回はその場合にS3にアーカイブする仕組みをCDKで作って見ようと思います。
今回作るのはこんな感じです。
(構成の出力には https://github.com/pistazie/cdk-dia を使わせて頂いてます!感謝!)
ざっくり説明すると
Database
ConstructがS3にアーカイブする仕組みで、DemoApp
は動作確認ようの1時間に1アイテムをDBにプッシュするだけのLambdaです。
Database
の中はDynamoDBにStream用のLambdaが接続してあって、削除されるアイテムをキャッチしてkinesisに流してS3へ保存する、、という流れになってます。
環境
- cdk 2.20.0
では 作っていきましょう👻
project準備
今回は久々にプロジェクトの準備からやります。
理由はKinesis Firehose
はAlpha版しかないので、プロジェクトの用意の段階でちょっとだけハマる可能性がありそうだからです!
$ PROJECT_NAME=お好きなプロジェクト名
$ mkdir $PROJECT_NAME && cd $_
$ cdk init --language python
# 仮想環境とかは各自お願いします
$ pip install -r requirements.txt
$ pip install aws_cdk.aws_kinesisfirehose_alpha
$ pip install aws_cdk.aws_kinesisfirehose_destinations_alpha
以上!!
Database
Construct
まずコードから、、、
import pathlib
from aws_cdk import (
Duration,
RemovalPolicy,
aws_dynamodb,
aws_kinesisfirehose_alpha,
aws_kinesisfirehose_destinations_alpha,
aws_lambda,
aws_lambda_event_sources,
aws_s3,
)
from constructs import Construct
class Database(Construct):
@property
def table(self) -> aws_dynamodb.Table:
return self._table
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id)
# TTLを設定したDynamoDBを作成する
table = aws_dynamodb.Table(
self,
"Table",
partition_key=aws_dynamodb.Attribute(name="pk", type=aws_dynamodb.AttributeType.STRING),
sort_key=aws_dynamodb.Attribute(name="sk", type=aws_dynamodb.AttributeType.STRING),
removal_policy=RemovalPolicy.DESTROY,
time_to_live_attribute="ttl",
stream=aws_dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
# 以下ケチるための設定
write_capacity=1,
read_capacity=1,
)
self._table = table
# DynamoStreamを処理するLambdaを作成する
function = aws_lambda.Function(
self,
"DynamoStreamFunction",
code=aws_lambda.Code.from_asset(str(pathlib.Path(__file__).resolve().parent.joinpath("stream_runtime"))),
runtime=aws_lambda.Runtime.PYTHON_3_9,
timeout=Duration.seconds(60),
handler="index.handler",
)
function.add_event_source(
aws_lambda_event_sources.DynamoEventSource(
table,
starting_position=aws_lambda.StartingPosition.LATEST,
),
)
# アーカイブ先のバケットを作成する
bucket = aws_s3.Bucket(
self,
"ItemArchiveBucket",
)
firehose = aws_kinesisfirehose_alpha.DeliveryStream(
self, "Firehose", destinations=[aws_kinesisfirehose_destinations_alpha.S3Bucket(bucket)]
)
# Lambdaに権限とかStream名とかあげる
firehose.grant_put_records(function)
function.add_environment(key="FIREHOSE_NAME", value=firehose.delivery_stream_name)
若干補足説明を入れます。
DynamoDBのTableを作ってる下記のパラメータですが、Streamで取得する情報を決めてます。そして今回は(NEW_AND_OLD_IMAGES)含めることができる情報全部含めるぜ!
となってます。削除なのでOLD_IMAGE
だけでも良いですね、たぶん。
table = aws_dynamodb.Table(
...
stream=aws_dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
...
)
あとここ↓ですが、デプロイ以降のアイテムをStreamに流すのでLATEST
に設定してます。
function.add_event_source(
aws_lambda_event_sources.DynamoEventSource(
...
starting_position=aws_lambda.StartingPosition.LATEST,
),
)
こんなところですかね、、あとはfirehosenにLambdaでデータを流すので権限与えてますくらいかな、、、
Database
ContactのLambdaの中身
まずコード
import json
import os
import boto3
FIREHOSE_NAME = os.environ["FIREHOSE_NAME"]
client = boto3.client("firehose")
def handler(event, context):
for record in event["Records"]:
if record["eventName"] == "REMOVE":
# TTLで削除されたレコードか判断
if record["userIdentity"]["principalId"] == "dynamodb.amazonaws.com":
# 削除されたレコードをKinesisへ流す
client.put_record(
DeliveryStreamName=FIREHOSE_NAME,
Record={"Data": json.dumps(record["dynamodb"]["OldImage"])},
)
コメントに書いてあるのでほぼ説明ないのですが、
# TTLで削除されたレコードか判断
これのソースはここ↓です。
https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/APIReference/API_streams_Identity.html
次に
if record["eventName"] == "REMOVE":
この辺のEventNameの仕様ですが、このへん↓
https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/APIReference/API_streams_Record.html
こんかいは削除なのでREMOVE
のときだけ動きます。
(おまけ)DemoApp
のLambdaの中身
完全におまけ要素ですが、DynamoDBにアイテムを流すための動作確認ようのLambdaを組んでみました。
import os
import time
import boto3
TABLE_NAME = os.environ["TABLE_NAME"]
TTL = 30 # 有効時間30秒
dynamo_table = boto3.resource("dynamodb").Table(TABLE_NAME)
def handler(event, context):
unix_time_sec = int(time.time())
dynamo_table.put_item(
Item={
"pk": "demo_app",
"sk": str(unix_time_sec),
"ttl": unix_time_sec + TTL,
},
)
Lambdaが叩かれると有効期限が30秒後のアイテムを1つDBにプッシュします。
sk
をstr
にしたの間違いでしたね(笑)
動作確認!
動作確認用のLambdaを手動で叩くとDBに下記のようなレコードが入ります!
そしてこのまま30秒待てばこのレコードが晴れてS3へ!!
となると思いきやTTLは過ぎてから48時間以内に削除されるので、しばらく放置ですね、、、
====== しばらくご ======
デプロイしたまましばらく存在を忘れていてバケットの中を確認したらこんな感じになってました
まぁ結構はいってました。
適当にオブジェクトをダウンロードしてみて開いてみると中身は、こんな↓感じ。
{"sk": {"S": "1650176000"}, "pk": {"S": "demo_app"}, "ttl": {"N": "1650176030"}}
無事にアーカイブされてそう!
以上です、ここまでお付き合いいただきありがとうございました!!
今回のリポジトリはこちら
https://github.com/sisi100/cdk-dynamo-stream-s3-archive-demo