Project Overview
This project implements a fully automated, serverless pipeline designed to ingest, validate, and route sales data from CSV files. The goal is to ensure only valid sales data is directed to a structured database (DynamoDB) and segregating invalid records into an S3 bucket for review and remediation.
I built this entire solution on AWS using serverless technologies to ensure scalability, reduce costs, and keep operations simple. The project also includes a secure way to receive sales data from external users or apps through an API Gateway. This setup allows data to be sent to our S3 bucket without giving direct access or exposing the bucket to the public internet.
Architecture Design
AWS Services Used
In this project, I used six AWS services:
- Amazon S3 (Simple Storage Service): This was used as the primary data lake for both raw input files and segregated invalid/error data. I used two buckets, including datainput bucket for uploading raw CSV sales data and projectdataoutput bucket for storing CSV files containing invalid sales records..
- AWS Lambda: This service enabled the core compute for data processing. Our Lambda function in this project is called SalesDataProcessor, which includes a python function that triggers upon new CSV uploads to datainput. It then reads the data in the bucket, validates each record, and routes it accordingly.
- Amazon DynamoDB: A fully managed NoSQL database used to store all valid sales records. I chose Amazon DynamoDB over SQL services because this solution needs to handle unpredictable traffic with low latency and minimal management. DynamoDB is serverless, scales automatically, and is great for storing structured but flexible data like sales records. I created a database table (ValidSalesData) where validated sales data is stored.
- Amazon API Gateway: Provides a secure, scalable entry point for external applications to submit data. In this pipeline, it facilitates uploading CSV files directly to the datainput S3 bucket.
- AWS Identity and Access Management (IAM): Using this service was crucial for creating and managing permissions for the Lambda function, allowing it to interact securely with S3, DynamoDB, and API Gateway.
- Amazon CloudWatch: This service provides logging and monitoring for the Lambda function’s execution, essential for debugging and operational oversight.
Data Flow
These are the six stages that our data will follow—from the users to the database for valid entries, or to the S3 bucket in case of invalid data.
Step 1: External Ingestion (New)
An external application or user securely uploads a CSV file via an API Gateway endpoint. This API Gateway endpoint is configured to directly put the file into the datainput S3 bucket.
Step 2: Trigger
An S3 event notification is configured to automatically invoke the SalesDataProcessor Lambda function whenever a new object (specifically, a .csv file) is created in the datainput bucket.
Step 3: Processing & Validation:
The Lambda function downloads the uploaded CSV file. It reads each row of the CSV and applies a predefined set of validation rules (in the code) to each record. These rules ensure data quality and correct formatting for fields like TransactionID, ProductID, Quantity, UnitPrice, SaleDate, and CustomerID. Most importantly, the validation logic includes whitespace stripping to handle common data entry issues.
Step 4: Data Routing:
- Valid Records: Records that pass all validation checks are inserted as individual items into the ValidSalesData DynamoDB table.
- Invalid Records: Records that fail any validation check are collected along with their respective Error_Reason. These invalid records are then compiled into a separate CSV file and uploaded to the projectdataoutput S3 bucket, under an invalid_data/ prefix, along with a timestamp.
- Logging & Monitoring: All processing activities, including successful operations and any errors, are logged to CloudWatch Logs for real-time monitoring and debugging.
Deployment Guide (High-Level Steps)
All the resources were deployed in the Stockholm (eu-north-1) AWS Region. These are the high level steps taken:
- Creating S3 Buckets: An S3 bucket all sales (datainput)for raw CSVs was created. Another bucket (projectdataoutput) for invalid CSVs was also created.
- Creating DynamoDB Table: A DynamoDB table named ValidSalesData was created. I set the Partition Key to TransactionID (String).
- Creating AWS Lambda Function: I created a Lambda function from the AWS console and chose the “Author from scratch” option. Details of our functions were as follows:
- Function Name: e.g., SalesDataProcessor.
- Runtime: Select Python 3.9 since it was the latest version at the time.
- Execution role: I created a new role with basic Lambda permissions that gave lambda basic the ability to send logs to CloudWatch
- Configuring IAM Permissions for Lambda’s Role: In the IAM console (accessed via Lambda’s “Permissions” tab), I attached three inline policies to the LambdaSalesProcessorRole. These policies allow reading from the datainput S3 bucket, writing to the projectdataoutput S3 bucket, and writing items to the ValidSalesData DynamoDB table.
- Deploying Lambda Function Code: After enabling the necessary permissions, the lambda function code was deployed.
- Configuring S3 Trigger: In the Lambda console, an S3 trigger was configured under the SalesDataProcessor function’s “Designer” tab, specifically to activate upon the creation of .csv files in the datainput bucket. This configuration ensures that only CSV uploads initiate the data processing pipeline. Subsequently, uploading a sample CSV file confirmed the successful triggering and execution of the Lambda function.
- Setting up API Gateway for S3 Upload: API Gateway was set up (AWS console) using the “Build” option under REST API. The details of our API gateway include:
- API name: SalesFileUploadAPI.
- Integration Type: AWS Service was chosen.
- AWS Service: Select S3.
- HTTP method: PUT.
- Action: PutObject.
- Execution Role: An IAM role that grants API Gateway permission to s3 bucket (datainput) was created.
Final Thoughts
This was an interesting but also challenging project for me. It’s a simple solution and nowhere near the complex AWS architectures I’ve seen others build. As someone still new to cloud, this project taught me a lot—especially after my first hands-on experience deploying a static website on AWS S3 that was simpler.
It was also the first time I used API Gateway in a project. I had to watch a few videos, read the documentation, and use ChatGPT to fully understand how the service works. It took me several hours to figure it out and integrate it properly because it plays a key role in allowing secure access for external users to send data to our S3 bucket.
Overall, it was a fun and educational project with many takeaways. Thanks for following along!