/ #cdk #dynamodb 

【CDK/Python】DynamoDB StreamのTTLで削除されるやつをS3へアーカイブする

もくじ

DynamoDB内から今後アクセスされる機会のないアイテムはどんどん削除したい派の私です。

ですが要件としてはなにかに使うかもしれないから消すのはもったいない!とうことも当然あると思いますので、今回はその場合にS3にアーカイブする仕組みをCDKで作って見ようと思います。

今回作るのはこんな感じです。

(構成の出力には https://github.com/pistazie/cdk-dia を使わせて頂いてます!感謝!)

ざっくり説明すると

DatabaseConstructがS3にアーカイブする仕組みで、DemoAppは動作確認ようの1時間に1アイテムをDBにプッシュするだけのLambdaです。

Databaseの中はDynamoDBにStream用のLambdaが接続してあって、削除されるアイテムをキャッチしてkinesisに流してS3へ保存する、、という流れになってます。

環境

  • cdk 2.20.0

では 作っていきましょう👻

project準備

今回は久々にプロジェクトの準備からやります。

理由はKinesis FirehoseはAlpha版しかないので、プロジェクトの用意の段階でちょっとだけハマる可能性がありそうだからです!

$ PROJECT_NAME=お好きなプロジェクト名

$ mkdir $PROJECT_NAME && cd $_
$ cdk init --language python

# 仮想環境とかは各自お願いします

$ pip install -r requirements.txt
$ pip install aws_cdk.aws_kinesisfirehose_alpha
$ pip install aws_cdk.aws_kinesisfirehose_destinations_alpha

以上!!

DatabaseConstruct

まずコードから、、、

import pathlib

from aws_cdk import (
    Duration,
    RemovalPolicy,
    aws_dynamodb,
    aws_kinesisfirehose_alpha,
    aws_kinesisfirehose_destinations_alpha,
    aws_lambda,
    aws_lambda_event_sources,
    aws_s3,
)
from constructs import Construct


class Database(Construct):
    @property
    def table(self) -> aws_dynamodb.Table:
        return self._table

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id)

        # TTLを設定したDynamoDBを作成する
        table = aws_dynamodb.Table(
            self,
            "Table",
            partition_key=aws_dynamodb.Attribute(name="pk", type=aws_dynamodb.AttributeType.STRING),
            sort_key=aws_dynamodb.Attribute(name="sk", type=aws_dynamodb.AttributeType.STRING),
            removal_policy=RemovalPolicy.DESTROY,
            time_to_live_attribute="ttl",
            stream=aws_dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            # 以下ケチるための設定
            write_capacity=1,
            read_capacity=1,
        )
        self._table = table

        # DynamoStreamを処理するLambdaを作成する
        function = aws_lambda.Function(
            self,
            "DynamoStreamFunction",
            code=aws_lambda.Code.from_asset(str(pathlib.Path(__file__).resolve().parent.joinpath("stream_runtime"))),
            runtime=aws_lambda.Runtime.PYTHON_3_9,
            timeout=Duration.seconds(60),
            handler="index.handler",
        )
        function.add_event_source(
            aws_lambda_event_sources.DynamoEventSource(
                table,
                starting_position=aws_lambda.StartingPosition.LATEST,
            ),
        )

        # アーカイブ先のバケットを作成する
        bucket = aws_s3.Bucket(
            self,
            "ItemArchiveBucket",
        )
        firehose = aws_kinesisfirehose_alpha.DeliveryStream(
            self, "Firehose", destinations=[aws_kinesisfirehose_destinations_alpha.S3Bucket(bucket)]
        )

        # Lambdaに権限とかStream名とかあげる
        firehose.grant_put_records(function)
        function.add_environment(key="FIREHOSE_NAME", value=firehose.delivery_stream_name)

若干補足説明を入れます。

DynamoDBのTableを作ってる下記のパラメータですが、Streamで取得する情報を決めてます。そして今回は(NEW_AND_OLD_IMAGES)含めることができる情報全部含めるぜ!となってます。削除なのでOLD_IMAGEだけでも良いですね、たぶん。

table = aws_dynamodb.Table(
    ...
    stream=aws_dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
    ...
)

あとここ↓ですが、デプロイ以降のアイテムをStreamに流すのでLATESTに設定してます。

function.add_event_source(
    aws_lambda_event_sources.DynamoEventSource(
        ...
        starting_position=aws_lambda.StartingPosition.LATEST,
    ),
)

こんなところですかね、、あとはfirehosenにLambdaでデータを流すので権限与えてますくらいかな、、、

DatabaseContactのLambdaの中身

まずコード

import json
import os

import boto3

FIREHOSE_NAME = os.environ["FIREHOSE_NAME"]
client = boto3.client("firehose")


def handler(event, context):
    for record in event["Records"]:

        if record["eventName"] == "REMOVE":

            # TTLで削除されたレコードか判断
            if record["userIdentity"]["principalId"] == "dynamodb.amazonaws.com":
                # 削除されたレコードをKinesisへ流す
                client.put_record(
                    DeliveryStreamName=FIREHOSE_NAME,
                    Record={"Data": json.dumps(record["dynamodb"]["OldImage"])},
                )

コメントに書いてあるのでほぼ説明ないのですが、

# TTLで削除されたレコードか判断

これのソースはここ↓です。

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/APIReference/API_streams_Identity.html

次に

if record["eventName"] == "REMOVE":

この辺のEventNameの仕様ですが、このへん↓

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/APIReference/API_streams_Record.html

こんかいは削除なのでREMOVEのときだけ動きます。

(おまけ)DemoAppのLambdaの中身

完全におまけ要素ですが、DynamoDBにアイテムを流すための動作確認ようのLambdaを組んでみました。

import os
import time

import boto3

TABLE_NAME = os.environ["TABLE_NAME"]
TTL = 30  # 有効時間30秒

dynamo_table = boto3.resource("dynamodb").Table(TABLE_NAME)


def handler(event, context):
    unix_time_sec = int(time.time())
    dynamo_table.put_item(
        Item={
            "pk": "demo_app",
            "sk": str(unix_time_sec),
            "ttl": unix_time_sec + TTL,
        },
    )

Lambdaが叩かれると有効期限が30秒後のアイテムを1つDBにプッシュします。

skstrにしたの間違いでしたね(笑)

動作確認!

動作確認用のLambdaを手動で叩くとDBに下記のようなレコードが入ります!

そしてこのまま30秒待てばこのレコードが晴れてS3へ!!

となると思いきやTTLは過ぎてから48時間以内に削除されるので、しばらく放置ですね、、、

====== しばらくご ======

デプロイしたまましばらく存在を忘れていてバケットの中を確認したらこんな感じになってました

まぁ結構はいってました。

適当にオブジェクトをダウンロードしてみて開いてみると中身は、こんな↓感じ。

{"sk": {"S": "1650176000"}, "pk": {"S": "demo_app"}, "ttl": {"N": "1650176030"}}

無事にアーカイブされてそう!

以上です、ここまでお付き合いいただきありがとうございました!!

今回のリポジトリはこちら

https://github.com/sisi100/cdk-dynamo-stream-s3-archive-demo

Author

Sisii

インフラが好きなエンジニアぶってるなにか