/ #CDK #Python 

【CDK/Python】AWSで特定の時間にメッセージを配信するサービスを実装してみる

もくじ

ちょっと前にAWSのコミュニティで特定の時間にメッセージを配信する場合にAWSでどんなサービス使えばいいの? という話題を見たので私はこんな方法で作るよ!というのを記事にしてみます。

今回作るモノのざっくりな仕様

UTC 12:00 に予め指定したプッシュ通知を送る!(という想定。実際はprintでメッセージを吐き出すだけ)

では 作っていきましょう👻

まず全体の構成はこんな感じです。

構成の出力には https://github.com/pistazie/cdk-dia を使わせて頂いてます!感謝!

内容としては12:00にEventBridgeが動いてDispatcherLambdaを叩きます。

次にLambdaが必要なデータをSQSにどんどん積んでいきます.

最後にSQSをトリガーとしてWorkerLambdaが起動して各自送信します。

今回面倒なので作ってませんが、SQSにデッドレターキューを設定して送信失敗を受け取ったり、非同期で実装されるDispatcherLambdaにデッドレターキューを付けて失敗時のフォローをする、、、なんて構成もできます。

具体的な中のコードはinfra側がこんな感じです!

import os
import pathlib

from aws_cdk import aws_events, aws_events_targets, aws_lambda, aws_lambda_event_sources, aws_sqs
from constructs import Construct


class ScheduledMessages(Construct):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id)

        schedule_rule = aws_events.Rule(
            self,
            "Schedule",
            schedule=aws_events.Schedule.cron(minute="0", hour="12", month="*", week_day="*", year="*"),
        )
        queue = aws_sqs.Queue(self, "Queue")

        self.__create_dispatcher_lambda(trigger_rule=schedule_rule, send_queue=queue)

        self.__create_worker_lambda(consume_queue=queue)

    def __create_dispatcher_lambda(self, trigger_rule: aws_events.Rule, send_queue: aws_sqs.Queue):
        """
        ルールで起動して、SQSにアイテムを投げるLambdaを作成する
        """
        dispatcher_lambda = aws_lambda.Function(
            self,
            "DispatcherLambda",
            environment={"SQS_NAME": send_queue.queue_name},
            **self.__build_lambda_param("dispatcher.handler"),
        )
        send_queue.grant_send_messages(dispatcher_lambda)
        trigger_rule.add_target(aws_events_targets.LambdaFunction(dispatcher_lambda))

    def __create_worker_lambda(self, consume_queue: aws_sqs.Queue):
        """
        SQSをトリガーにしてメッセージを送信する想定のLambdaを作成する
        """
        powertools_layer = aws_lambda.LayerVersion.from_layer_version_arn(
            self,
            "lambda-powertools-layer",
            f"arn:aws:lambda:{os.getenv('CDK_DEFAULT_REGION')}:017000801446:layer:AWSLambdaPowertoolsPython:3",
        )
        worker_lambda = aws_lambda.Function(
            self, "WorkerLambda", layers=[powertools_layer], **self.__build_lambda_param("worker.handler")
        )
        worker_lambda.add_event_source(aws_lambda_event_sources.SqsEventSource(consume_queue))
        consume_queue.grant_consume_messages(worker_lambda)

    def __build_lambda_param(self, handler: str):
        """
        Lambdaの共有な設定
        """
        return dict(
            code=aws_lambda.Code.from_asset(str(pathlib.Path(__file__).resolve().parent.joinpath("runtime"))),
            runtime=aws_lambda.Runtime.PYTHON_3_9,
            handler=handler,
        )

見た通りで特に補足はないのですが、

WorkerLambdaのSQSでitemの処理をlambda_power_toolsを使って楽したいのでレイヤー入れてます。

詳細が気になるかたはこの辺をご参照ください…

LambdaPowertoolsでSQSのバッチ処理を楽にやろう

【Lambda Powertools】パブリックレイヤーが実装されたのでCDKで動かしてみた!

次にDispatcherLambdaはこんな感じ!

import json
import os
from dataclasses import asdict, dataclass

import boto3

sqs = boto3.resource("sqs")
queue = sqs.get_queue_by_name(QueueName=os.getenv("SQS_NAME"))

MAX_NUMBER_OF_ENTRIES = 10  # 1回のキューに送信できる最大数
NUMBER_OF_SAMPLE_ITEMS = 20  # 動作確認でキューにプッシュするアイテムの数


def handler(event, context):
    for entries in get_entries():
        queue.send_messages(Entries=entries)


def get_entries():
    """
    普通のユースケースだとDBから送信対象を取ってくる感じだけれども、
    ここでは適当なデータを作るだけ
    """

    @dataclass
    class SampleItem:
        """Queueに送信するサンプルデータ"""

        user_id: int
        message: str

        def convert_entry(self):
            """`send_messages`のエンティティのフォーマットに変形する"""
            return {"Id": f"{self.user_id}", "MessageBody": json.dumps(asdict(self))}

    items_to_send = [SampleItem(i, f"メッセージ_{i}") for i in range(NUMBER_OF_SAMPLE_ITEMS)]

    entries = []
    for item in items_to_send:
        entries.append(item.convert_entry())
        if len(entries) == MAX_NUMBER_OF_ENTRIES:
            yield entries
            entries.clear()

get_entriesの中でごにょごにょしてますが、send_messagesでするためのSampleデータを作っているだけなので、注目する必要のないか所だったりします。

そのうえで補足するとsend_messagesの引数Entriesは下記のようなデータで、IdMessageBodyは必須です

Entries=[
    {
        'Id': '1',
        'MessageBody': 'world'
    },
    {
        'Id': '2',
        'MessageBody': 'boto3',
        'MessageAttributes': {
            'Author': {
                'StringValue': 'Daniel',
                'DataType': 'String'
            }
        }
    }
]

https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html#sending-messages

もう1つ補足すると1回で送信できるメッセージの数は最大10個(2022/02/03現在)です

https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html

最後にWorkerLambda

import json

from aws_lambda_powertools.utilities.batch import sqs_batch_processor


def record_handler(record):
    print(f"メッセージを送るよ! :{json.dumps(record['body'])}")
    return


@sqs_batch_processor(record_handler=record_handler)
def handler(event, context):
    return

はい。。。受け取ったitemのbodyをログに出力してるだけです。

本来はここでrecord_handlerで送信処理を記述すると、無事に指定した時間にメッセージを配信するサービス、が完成する感じですー

おわりに

今回の要件をクリアする方法は、この構成だけでなく色々あります。

AWSの公式でも少し違いますが、サーバーレスでスケジュールを実装する戦略が紹介されてます。

https://aws.amazon.com/jp/blogs/architecture/serverless-scheduling-with-amazon-eventbridge-aws-lambda-and-amazon-dynamodb/

ここではEventBridgeLambdaEventBridgeという構成になってますが、

今回の構成はEventBridgeLambdaSQSなのでそこまで間違ってないかな、と思います。

さてさて、他の手段ではDynamoDBのTTLをトリガーにするとか、ステップファンクションを使った方法とかあるみたいです。

DynamoDBはTTLの仕様で最大48時間の遅延が発生するので実用性うすそうですが、ステップファンクションのほうは結構面白そうなのでちょっと調べて腑に落ちたら記事にしてみようかなーとか思いました。

以上、最後までお読みいただきありがとうございました!

今回のリポジトリはこちら

https://github.com/sisi100/scheduled_messages_with_aws

Author

Sisii

AWSが好きなフリーランスのエンジニア。主にAWSを使ったバックエンドの開発をしています😊最近Frontも始めました