Implement 'Schedule Message'
Schedule content to be sent at a specific hour and date to all the subscribers of a specific group.
Implementation
- Duplicate
get_subscribers
and rename the new folderschedule_message
- Paste
import json
import boto3
from datetime import datetime
from dataclasses import dataclass
from dacite import from_dict
import logging
import random
import string
import re
from utils.consts import SCHEDULED_MESSAGES_TABLE, SCHEDULED_MESSAGES_BUCKET
from utils.api_gw_helpers import require_group, lambda_response
from utils.general import get_schedule_date_key
# Cache client
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(SCHEDULED_MESSAGES_TABLE)
s3 = boto3.resource("s3")
bucket = s3.Bucket(SCHEDULED_MESSAGES_BUCKET)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
@dataclass(frozen=True) #(1)!
class Message:
subject: str
body: str
schedule_on: int
@require_group
def lambda_handler(event, context):
# Get group name
group = event["group_name"]
body = event.get("body")
if body is None:
return lambda_response({"err": "Missing message details"}, status_code=500)
else:
try:
message = from_dict(data_class=Message, data=json.loads(body))
logger.info("Saving message into S3")
key = "".join(random.choice(string.ascii_lowercase) for i in range(10))
meta_data = {
"group": group,
"subject": message.subject,
"scheduled": str(datetime.fromtimestamp(message.schedule_on / 1000)),
"key": key,
}
tagging = "&".join( #(2)!
f"{k}={_filter_string(str(v))}" for k, v in meta_data.items()
)
logger.info(tagging)
bucket.put_object(Body=str.encode(body), Key=key, Tagging=tagging)
logger.info("S3 object saved successfully")
response = table.put_item(
Item={
"group_name": group,
"scheduled_date": get_schedule_date_key(
datetime.fromtimestamp(message.schedule_on / 1000)
),
"message_key": key,
"message_added": int(datetime.now().timestamp() * 1000),
}
)
logger.info("DDB object saved successfully")
return lambda_response(
{"message": "Message scheduled successfully", "details": meta_data}
)
except Exception as e:
logging.error(e)
return lambda_response({"err": "Failed saving message"}, status_code=500)
def _filter_string(s: str):
return re.sub(r"[^\w\s\+\-=:\/@\.]", "", s)
- Encapsulate data as a model.
- Not all tage values are valid.
into app.py
-
Paste
intogroup_subscription_layer/utils/requirements.txt
-
Paste
intofrom datetime import datetime from boto3.dynamodb.conditions import Key def get_schedule_date_key(exact_date:datetime) -> str: return f"{exact_date.year}_{exact_date.month}_{exact_date.day}_{exact_date.hour}"
group_subscription_layer/utils/general.py
-
Add to
template.yaml
Under Resources
ScheduleFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: schedule_message/
Handler: app.lambda_handler
Runtime: python3.11
Architectures:
- x86_64
Policies:
- DynamoDBWritePolicy:
TableName:
!Ref ScheduledMessagesTable
- S3WritePolicy:
BucketName: !Ref ScheduledMessagesBucket
- Statement: #(1)!
- Effect: "Allow"
Action:
- "s3:PutObjectTagging"
Resource: !Sub "arn:aws:s3:::${ScheduledMessagesBucket}/*"
Layers:
- !Ref SharedLayer
Environment:
Variables:
SCHEDULED_MESSAGES_BUCKET_NAME: !Ref ScheduledMessagesBucket
SCHEDULED_MESSAGES_TABLE_NAME: !Ref ScheduledMessagesTable
Events:
Subscribers:
Type: Api
Properties:
Path: /{group}/schedule
Method: post
- Use a manually defined policy.
Add new S3 bucket definition
Add a new table definition
ScheduledMessagesTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
-
AttributeName: "group_name"
AttributeType: "S"
-
AttributeName: "scheduled_date"
AttributeType: "S"
KeySchema:
-
AttributeName: "scheduled_date"
KeyType: "HASH"
-
AttributeName: "group_name"
KeyType: "RANGE"
BillingMode: PAY_PER_REQUEST
Replace the Outputs
section
Outputs:
SubscribersList:
Description: "API Gateway endpoint URL getting the subscribers"
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/{group}/subscribers"
ScheduleMessage:
Description: "API Gateway endpoint URL for scheduling a message"
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/{group}/schedule"
-
Paste
intoimport os SUBSCRIBERS_TABLE = os.environ.get("SUBSCRIBERS_TABLE") SCHEDULED_MESSAGES_TABLE = os.environ.get("SCHEDULED_MESSAGES_TABLE_NAME") SCHEDULED_MESSAGES_BUCKET = os.environ.get("SCHEDULED_MESSAGES_BUCKET_NAME")
group_subscription_layer/utils/consts.py
-
Rerun
sam build && sam deploy
. -
Test it using curl
curl -X POST https://<api-id>.execute-api.us-east-1.amazonaws.com/Prod/serverless/schedule -H 'Content-Type: application/json' -d '{"subject":"Hello SLS workshop!", "body":"The workshop is not recorded.<h1>Welcome dear friends</h1>", "schedule_on":1649753447000}'
- Search for the file on the S3 bucket and the record in DynamoDB.
Insights
Using S3 to store content
DynamoDB has a strict size limit of 400KB per record. Therefore, when storing content that exceeds this limit, it is recommended to use S3 to store the content and use a 'pointer' (an S3 object path) to the full content, as part of the DynamoDB record. In the above code, we are saving content into S3 and storing the object key as part of the DynamoDB record.
bucket.put_object(Body=str.encode(body), Key=key, Tagging=tagging) # <-- Save to S3
logger.info("S3 object saved successfully")
response = table.put_item(
Item={
"group_name": group,
"scheduled_date": message.schedule_on,
"message_key": key, # <-- Use key
"message_added": int(datetime.now().timestamp() * 1000)
}
)
Tagging S3 objects
AWS Tags are a valuable feature in AWS. I highly recommend making use of them wherever possible, as they can aid in cost optimization and compliance.
In addition to tagging resources, you can also tag your S3 objects, which serves several purposes:
- An easy way to understand the content of the object without actually downloading and opening it.
- Identify large objects in S3, manage their costs, and control their life cycle
- Access control
Tagging S3 objects has some limitations on the types of characters that are allowed as part of your key and value content. For example, characters like !
or @
are not allowed. In cases like this, you can encode your string to eliminate these characters
PowerTools
Stop reinventing the wheel. AWS Lambda Power Tools is a suite of utilities for AWS Lambda functions that makes it easier for developers to follow best practices for tracing, structured logging, custom metrics, and more. You can find more details here
Principle of Least Priviliged Access - Take 2
There may be instances where AWS SAM doesn't supply the exact IAM policies you require. In such situations, it becomes necessary to formulate your own policy. For our specific scenario, we need the PutObjectTagging
permission to write an S3 object tag. Unfortunately, AWS SAM doesn't offer this particular policy. The only available option is the S3FullAccessPolicy
, which is excessively broad for our needs.
Exercies
- Add Lambda Power Tools and replace the existing log statements with those provided by Power Tools.
- Why did we choose to structure the DynamoDB table the way we did?
- Replace the
DynamoDBWritePolicy
policy with a manually defined policy that only allows thedynamodb:PutItem
operation.
Tip
- Each policy has a strict structure; adhere to it.
- Define
Effect
,Action
andResource