Building a simple ETL Pipeline in PySpark and S3 persistence: A SOLID Approach

Building a simple ETL Pipeline in PySpark and S3 persistence: A SOLID Approach

Github Repository

Table of Contents

  1. Context
  2. Project Overview
  3. Dependency Injection
  4. Decorators for Logging and Timing
  5. Using Static Methods
  6. S3 configuration
  7. AWS CLI Setup
  8. Disclaimer


Context

As a junior Data Engineer, I've been enhancing my PySpark skills while adhering to SOLID principles, clean code practices, OOP, and implementing advanced techniques like dependency injection and decorators. In this article, I'll explain how I structured my ETL project and provide a brief step-by-step guide for setting up AWS for data storage.

Please, notice that this project aims just to share the progress on pyspark programming within the everlearner path. I didn't pretend to build up a whole end to end this time.

However, I will iterate over this project to add more transformation methods, tests and trying to figure it out how to load data applying chunksize limits. I this way, I could simulate memory limitations that can cause errors when loading large datasets.

The Dataset procesed here is 8M rows, 22 columns and 1.7gb approx. provided by The City of Chicago's open data portal and contains crime data.

Specs:

Pyspark was run into an env in a windows local machine ( I found many issues adding environment variables to PATH). This guide might be useful for you.

Do not forget to add the env variables into the "Run/Debug configuration" in Pycharm as well

  • Pycharm
  • Python 3.12.0
  • pyspark==3.4.3
  • winutils - Hadoop-3.0.0
  • jdk 8


Project Overview

This project demonstrates a simple ETL pipeline using PySpark. The code is available in my GitHub repository. Feel free to fork it and contribute by adding more transformation methods.

Article content

Dependency Injection

To maintain loose coupling and flexibility, I used "in-n-out" for dependency injection. This helps manage dependencies in a modular way.

Eventhough it is another well-know depency to do that like "dependency-injector" it didn't work for me as it seems to be incompatible with python 3.12. "dependency-injector 3.12.0" either as well. So, to avoid change the python version in my project and the possible total breakage of dependencies compatibility balance, I decided to move forward to this "in_n_out" light and efficient library. Here's a brief example:

containers/containers.py:

import in_n_out as ino
from extractors.data_loader import DataLoader
from transformers.spark_manager import SparkManager

def provide_spark_manager() -> SparkManager:
    return SparkManager()

def provide_data_loader(spark_manager: SparkManager) -> DataLoader:
    return DataLoader(spark_manager.get_spark_session())

ino.register_provider(SparkManager, provide_spark_manager)
ino.register_provider(DataLoader, lambda: provide_data_loader(provide_spark_manager()))        


Decorators for Logging and Timing

Decorators help in adding logging and timing functionality to methods without altering their code. This adheres to the Open/Closed Principle (OCP).


decorators/decorators.py:

import functools
import logging
import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def log_decorator(func):
    @functools.wraps(func)
    def wrapper_decorator(*args, **kwargs):
        logging.info(f"Starting {func.__name__}...")
        result = func(*args, **kwargs)
        logging.info(f"Completed {func.__name__} successfully.")
        return result
    return wrapper_decorator

def timing_decorator(func):
    @functools.wraps(func)
    def wrapper_decorator(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        logging.info(f"{func.__name__} executed in {end_time - start_time:.2f} seconds")
        return result
    return wrapper_decorator        


Using Static Methods

Static methods in the Config class provide utility functions for managing paths, ensuring single responsibility.

config/config.py:

import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    SPARK_APP_NAME = os.getenv('SPARK_APP_NAME')
    SPARK_URL = os.getenv('SPARK_URL')
    SPARK_EXECUTOR_MEMORY = os.getenv('SPARK_EXECUTOR_MEMORY')
    SPARK_EXECUTOR_CORES = int(os.getenv('SPARK_EXECUTOR_CORES'))
    SPARK_CORES_MAX = int(os.getenv('SPARK_CORES_MAX'))
    DATA_DIR = os.getenv('DATA_DIR')
    REMOTE_DATA_URL = os.getenv('REMOTE_DATA_URL')
    LOCAL_FILENAME = os.getenv('LOCAL_FILENAME')
    OUTPUT_DIR = os.path.join(DATA_DIR, 'output')

    @staticmethod
    def get_data_path(filename):
        return os.path.join(Config.DATA_DIR, filename)

    @staticmethod
    def get_output_path(filename):
        return os.path.join(Config.OUTPUT_DIR, filename)

os.makedirs(Config.DATA_DIR, exist_ok=True)
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)        


S3 configuration

It was decisive to set the configurations to save the transformation result as .parquet into the bucket. One of the main issues was that the file to be loaded was too large. And due to a temporary local storage before pushing data to S3 and the aws connection itself, it was mandatory to set the following configs.


transformers/spark_manager.py

 @log_decorator
    @timing_decorator
    def save_to_s3(self, df: DataFrame, path: str):
        hadoop_conf = self.spark._jsc.hadoopConfiguration()
        hadoop_conf.set("fs.s3a.access.key", Config.AWS_ACCESS_KEY)
        hadoop_conf.set("fs.s3a.secret.key", Config.AWS_SECRET_KEY)
        hadoop_conf.set("fs.s3a.endpoint", "s3.us-east-1.amazonaws.com")
        hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        hadoop_conf.set("fs.s3a.multipart.size", "104857600")
        hadoop_conf.set("fs.s3a.fast.upload", "true")
        hadoop_conf.set("fs.s3a.fast.upload.buffer", "bytebuffer")

        df.write.mode('overwrite').parquet(path)
        print(f"DataFrame saved to {path}")        

A short breakdown of them:

spark.hadoop.fs.s3a.multipart.size

Configuration: .config("spark.hadoop.fs.s3a.multipart.size", "104857600")
Meaning: This configuration sets the size of the parts into which a file will be divided during a multipart upload to S3. The value 104857600 represents 100 MB (104,857,600 bytes).
Purpose: Multipart upload is a technique that allows a large file to be divided into smaller parts and uploaded in parallel. This can improve upload performance and make it more efficient, especially for large files. The part size should be appropriate to balance upload efficiency and part handling.


spark.hadoop.fs.s3a.fast.upload

Configuration: .config("spark.hadoop.fs.s3a.fast.upload", "true")
Meaning: This configuration enables the fast upload mode for the s3a connector.
Purpose: When this option is enabled (true), the s3a connector uses an optimized upload strategy that improves file upload performance. This includes using in-memory buffers to speed up write operations before the data is transferred to S3.


spark.hadoop.fs.s3a.fast.upload.buffer

Configuration: .config("spark.hadoop.fs.s3a.fast.upload.buffer", "bytebuffer")
Meaning: This configuration specifies the type of buffer to be used for fast upload.
Purpose: By setting the value to bytebuffer, a ByteBuffer type buffer is used, which is an efficient implementation for handling binary data in memory. This can further enhance upload performance by minimizing memory management overhead.
Article content
Article content
Article content
Article content



AWS CLI Setup

To integrate with AWS S3, follow these steps:

  • Create an IAM User:

Go to the IAM console.

Create a new user.

Attach the policies

Keep the access key and secret key handy ( you will need them to configure AWS CLI into your environment with Pycharm)

Article content

  • Set Bucket Policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": "*",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::spark-streaming-data/*"
        }
    ]
}
        

  • Install AWS CLI:

downdload AWS CLI installer depending on your OS here

add AWS CLI to PATH in your environment variables

# check installation
aws --version

# stablish connection with the recently created user credentials.
# Type in: secret key , access key, region {us-east-1}, output format {Json}

aws configure
        


Disclaimer

This project is a product of my ongoing learning journey and may contain code errors or conceptual mistakes. Any errors are solely attributable to me as I continue to actively share my learning experiences. I appreciate any feedback and suggestions for improvement. Thank you for your understanding.


To view or add a comment, sign in

Others also viewed

Explore topics