【CDK/Python】AWSで特定の時間にメッセージを配信するサービスを実装してみる
ちょっと前にAWSのコミュニティで特定の時間にメッセージを配信する場合にAWSでどんなサービス使えばいいの?
という話題を見たので私はこんな方法で作るよ!
というのを記事にしてみます。
今回作るモノのざっくりな仕様
UTC 12:00 に予め指定したプッシュ通知を送る!(という想定。実際はprintでメッセージを吐き出すだけ)
では 作っていきましょう👻
まず全体の構成はこんな感じです。
構成の出力には https://github.com/pistazie/cdk-dia を使わせて頂いてます!感謝!
内容としては12:00にEventBridgeが動いてDispatcherLambdaを叩きます。
次にLambdaが必要なデータをSQSにどんどん積んでいきます.
最後にSQSをトリガーとしてWorkerLambdaが起動して各自送信します。
今回面倒なので作ってませんが、SQSにデッドレターキューを設定して送信失敗を受け取ったり、非同期で実装されるDispatcherLambdaにデッドレターキューを付けて失敗時のフォローをする、、、なんて構成もできます。
具体的な中のコードはinfra側がこんな感じです!
import os
import pathlib
from aws_cdk import aws_events, aws_events_targets, aws_lambda, aws_lambda_event_sources, aws_sqs
from constructs import Construct
class ScheduledMessages(Construct):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id)
schedule_rule = aws_events.Rule(
self,
"Schedule",
schedule=aws_events.Schedule.cron(minute="0", hour="12", month="*", week_day="*", year="*"),
)
queue = aws_sqs.Queue(self, "Queue")
self.__create_dispatcher_lambda(trigger_rule=schedule_rule, send_queue=queue)
self.__create_worker_lambda(consume_queue=queue)
def __create_dispatcher_lambda(self, trigger_rule: aws_events.Rule, send_queue: aws_sqs.Queue):
"""
ルールで起動して、SQSにアイテムを投げるLambdaを作成する
"""
dispatcher_lambda = aws_lambda.Function(
self,
"DispatcherLambda",
environment={"SQS_NAME": send_queue.queue_name},
**self.__build_lambda_param("dispatcher.handler"),
)
send_queue.grant_send_messages(dispatcher_lambda)
trigger_rule.add_target(aws_events_targets.LambdaFunction(dispatcher_lambda))
def __create_worker_lambda(self, consume_queue: aws_sqs.Queue):
"""
SQSをトリガーにしてメッセージを送信する想定のLambdaを作成する
"""
powertools_layer = aws_lambda.LayerVersion.from_layer_version_arn(
self,
"lambda-powertools-layer",
f"arn:aws:lambda:{os.getenv('CDK_DEFAULT_REGION')}:017000801446:layer:AWSLambdaPowertoolsPython:3",
)
worker_lambda = aws_lambda.Function(
self, "WorkerLambda", layers=[powertools_layer], **self.__build_lambda_param("worker.handler")
)
worker_lambda.add_event_source(aws_lambda_event_sources.SqsEventSource(consume_queue))
consume_queue.grant_consume_messages(worker_lambda)
def __build_lambda_param(self, handler: str):
"""
Lambdaの共有な設定
"""
return dict(
code=aws_lambda.Code.from_asset(str(pathlib.Path(__file__).resolve().parent.joinpath("runtime"))),
runtime=aws_lambda.Runtime.PYTHON_3_9,
handler=handler,
)
見た通りで特に補足はないのですが、
WorkerLambdaのSQSでitemの処理をlambda_power_toolsを使って楽したいのでレイヤー入れてます。
詳細が気になるかたはこの辺をご参照ください…
LambdaPowertoolsでSQSのバッチ処理を楽にやろう
【Lambda Powertools】パブリックレイヤーが実装されたのでCDKで動かしてみた!
次にDispatcherLambdaはこんな感じ!
import json
import os
from dataclasses import asdict, dataclass
import boto3
sqs = boto3.resource("sqs")
queue = sqs.get_queue_by_name(QueueName=os.getenv("SQS_NAME"))
MAX_NUMBER_OF_ENTRIES = 10 # 1回のキューに送信できる最大数
NUMBER_OF_SAMPLE_ITEMS = 20 # 動作確認でキューにプッシュするアイテムの数
def handler(event, context):
for entries in get_entries():
queue.send_messages(Entries=entries)
def get_entries():
"""
普通のユースケースだとDBから送信対象を取ってくる感じだけれども、
ここでは適当なデータを作るだけ
"""
@dataclass
class SampleItem:
"""Queueに送信するサンプルデータ"""
user_id: int
message: str
def convert_entry(self):
"""`send_messages`のエンティティのフォーマットに変形する"""
return {"Id": f"{self.user_id}", "MessageBody": json.dumps(asdict(self))}
items_to_send = [SampleItem(i, f"メッセージ_{i}") for i in range(NUMBER_OF_SAMPLE_ITEMS)]
entries = []
for item in items_to_send:
entries.append(item.convert_entry())
if len(entries) == MAX_NUMBER_OF_ENTRIES:
yield entries
entries.clear()
get_entries
の中でごにょごにょしてますが、send_messages
でするためのSampleデータを作っているだけなので、注目する必要のないか所だったりします。
そのうえで補足するとsend_messages
の引数Entries
は下記のようなデータで、Id
とMessageBody
は必須です
Entries=[
{
'Id': '1',
'MessageBody': 'world'
},
{
'Id': '2',
'MessageBody': 'boto3',
'MessageAttributes': {
'Author': {
'StringValue': 'Daniel',
'DataType': 'String'
}
}
}
]
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html#sending-messages
もう1つ補足すると1回で送信できるメッセージの数は最大10個(2022/02/03現在)です
最後にWorkerLambda
import json
from aws_lambda_powertools.utilities.batch import sqs_batch_processor
def record_handler(record):
print(f"メッセージを送るよ! :{json.dumps(record['body'])}")
return
@sqs_batch_processor(record_handler=record_handler)
def handler(event, context):
return
はい。。。受け取ったitemのbodyをログに出力してるだけです。
本来はここでrecord_handler
で送信処理を記述すると、無事に指定した時間にメッセージを配信するサービス、が完成する感じですー
おわりに
今回の要件をクリアする方法は、この構成だけでなく色々あります。
AWSの公式でも少し違いますが、サーバーレスでスケジュールを実装する戦略が紹介されてます。
ここではEventBridge
→Lambda
→EventBridge
という構成になってますが、
今回の構成はEventBridge
→Lambda
→SQS
なのでそこまで間違ってないかな、と思います。
さてさて、他の手段ではDynamoDBのTTLをトリガーにするとか、ステップファンクションを使った方法とかあるみたいです。
DynamoDBはTTLの仕様で最大48時間の遅延が発生するので実用性うすそうですが、ステップファンクションのほうは結構面白そうなのでちょっと調べて腑に落ちたら記事にしてみようかなーとか思いました。
以上、最後までお読みいただきありがとうございました!