Building a simple ETL Pipeline in PySpark and S3 persistence: A SOLID Approach
Table of Contents
Context
As a junior Data Engineer, I've been enhancing my PySpark skills while adhering to SOLID principles, clean code practices, OOP, and implementing advanced techniques like dependency injection and decorators. In this article, I'll explain how I structured my ETL project and provide a brief step-by-step guide for setting up AWS for data storage.
Please, notice that this project aims just to share the progress on pyspark programming within the everlearner path. I didn't pretend to build up a whole end to end this time.
However, I will iterate over this project to add more transformation methods, tests and trying to figure it out how to load data applying chunksize limits. I this way, I could simulate memory limitations that can cause errors when loading large datasets.
The Dataset procesed here is 8M rows, 22 columns and 1.7gb approx. provided by The City of Chicago's open data portal and contains crime data.
Specs:
Pyspark was run into an env in a windows local machine ( I found many issues adding environment variables to PATH). This guide might be useful for you.
Do not forget to add the env variables into the "Run/Debug configuration" in Pycharm as well
Project Overview
This project demonstrates a simple ETL pipeline using PySpark. The code is available in my GitHub repository. Feel free to fork it and contribute by adding more transformation methods.
Dependency Injection
To maintain loose coupling and flexibility, I used "in-n-out" for dependency injection. This helps manage dependencies in a modular way.
Eventhough it is another well-know depency to do that like "dependency-injector" it didn't work for me as it seems to be incompatible with python 3.12. "dependency-injector 3.12.0" either as well. So, to avoid change the python version in my project and the possible total breakage of dependencies compatibility balance, I decided to move forward to this "in_n_out" light and efficient library. Here's a brief example:
containers/containers.py:
import in_n_out as ino
from extractors.data_loader import DataLoader
from transformers.spark_manager import SparkManager
def provide_spark_manager() -> SparkManager:
return SparkManager()
def provide_data_loader(spark_manager: SparkManager) -> DataLoader:
return DataLoader(spark_manager.get_spark_session())
ino.register_provider(SparkManager, provide_spark_manager)
ino.register_provider(DataLoader, lambda: provide_data_loader(provide_spark_manager()))
Decorators for Logging and Timing
Decorators help in adding logging and timing functionality to methods without altering their code. This adheres to the Open/Closed Principle (OCP).
decorators/decorators.py:
import functools
import logging
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def log_decorator(func):
@functools.wraps(func)
def wrapper_decorator(*args, **kwargs):
logging.info(f"Starting {func.__name__}...")
result = func(*args, **kwargs)
logging.info(f"Completed {func.__name__} successfully.")
return result
return wrapper_decorator
def timing_decorator(func):
@functools.wraps(func)
def wrapper_decorator(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
logging.info(f"{func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper_decorator
Using Static Methods
Static methods in the Config class provide utility functions for managing paths, ensuring single responsibility.
config/config.py:
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
SPARK_APP_NAME = os.getenv('SPARK_APP_NAME')
SPARK_URL = os.getenv('SPARK_URL')
SPARK_EXECUTOR_MEMORY = os.getenv('SPARK_EXECUTOR_MEMORY')
SPARK_EXECUTOR_CORES = int(os.getenv('SPARK_EXECUTOR_CORES'))
SPARK_CORES_MAX = int(os.getenv('SPARK_CORES_MAX'))
DATA_DIR = os.getenv('DATA_DIR')
REMOTE_DATA_URL = os.getenv('REMOTE_DATA_URL')
LOCAL_FILENAME = os.getenv('LOCAL_FILENAME')
OUTPUT_DIR = os.path.join(DATA_DIR, 'output')
@staticmethod
def get_data_path(filename):
return os.path.join(Config.DATA_DIR, filename)
@staticmethod
def get_output_path(filename):
return os.path.join(Config.OUTPUT_DIR, filename)
os.makedirs(Config.DATA_DIR, exist_ok=True)
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)
S3 configuration
It was decisive to set the configurations to save the transformation result as .parquet into the bucket. One of the main issues was that the file to be loaded was too large. And due to a temporary local storage before pushing data to S3 and the aws connection itself, it was mandatory to set the following configs.
transformers/spark_manager.py
@log_decorator
@timing_decorator
def save_to_s3(self, df: DataFrame, path: str):
hadoop_conf = self.spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", Config.AWS_ACCESS_KEY)
hadoop_conf.set("fs.s3a.secret.key", Config.AWS_SECRET_KEY)
hadoop_conf.set("fs.s3a.endpoint", "s3.us-east-1.amazonaws.com")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.multipart.size", "104857600")
hadoop_conf.set("fs.s3a.fast.upload", "true")
hadoop_conf.set("fs.s3a.fast.upload.buffer", "bytebuffer")
df.write.mode('overwrite').parquet(path)
print(f"DataFrame saved to {path}")
A short breakdown of them:
spark.hadoop.fs.s3a.multipart.size
Configuration: .config("spark.hadoop.fs.s3a.multipart.size", "104857600")
Meaning: This configuration sets the size of the parts into which a file will be divided during a multipart upload to S3. The value 104857600 represents 100 MB (104,857,600 bytes).
Purpose: Multipart upload is a technique that allows a large file to be divided into smaller parts and uploaded in parallel. This can improve upload performance and make it more efficient, especially for large files. The part size should be appropriate to balance upload efficiency and part handling.
spark.hadoop.fs.s3a.fast.upload
Configuration: .config("spark.hadoop.fs.s3a.fast.upload", "true")
Meaning: This configuration enables the fast upload mode for the s3a connector.
Purpose: When this option is enabled (true), the s3a connector uses an optimized upload strategy that improves file upload performance. This includes using in-memory buffers to speed up write operations before the data is transferred to S3.
spark.hadoop.fs.s3a.fast.upload.buffer
Configuration: .config("spark.hadoop.fs.s3a.fast.upload.buffer", "bytebuffer")
Meaning: This configuration specifies the type of buffer to be used for fast upload.
Purpose: By setting the value to bytebuffer, a ByteBuffer type buffer is used, which is an efficient implementation for handling binary data in memory. This can further enhance upload performance by minimizing memory management overhead.
AWS CLI Setup
To integrate with AWS S3, follow these steps:
Go to the IAM console.
Create a new user.
Attach the policies
Keep the access key and secret key handy ( you will need them to configure AWS CLI into your environment with Pycharm)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": "arn:aws:s3:::spark-streaming-data/*"
}
]
}
downdload AWS CLI installer depending on your OS here
add AWS CLI to PATH in your environment variables
# check installation
aws --version
# stablish connection with the recently created user credentials.
# Type in: secret key , access key, region {us-east-1}, output format {Json}
aws configure
Disclaimer
This project is a product of my ongoing learning journey and may contain code errors or conceptual mistakes. Any errors are solely attributable to me as I continue to actively share my learning experiences. I appreciate any feedback and suggestions for improvement. Thank you for your understanding.