I'd appreciate your likes and comments). Additionally, it will result in my
lengthiest blog post to date. However, regardless of the length, it's a
significant achievement that I'm eager to share with you. I'll refrain from
delving into unnecessary details and get straight to the main points to prevent
this post from turning into a 100-minute read :). As always, I'll strive to
simplify everything to ensure even those who aren't tech-savvy can easily follow
along. Why? Everything has a why, this project too. (DevOps for data
engineering) and I needed to apply them in an end-to-end project. Of course,
this project is not the best one out there, but it helps me to quickly iterate
and make errors. (And it reflects the reality of Modern Data Engineering, with
beautiful tool icons everywhere). End Goal The end goal of this project is to
have a fully functional data platform/pipeline, that will refresh our analytics
tables/dashboards daily. The whole infrastructure As you can see in the image,
there are a few things we needed to set up to run this project. The first: Where
will the data come from? RDS PostgreSQL+ a Python script in an EC2 instance to
generate new data every 2 hours. The second: Where will the data be stored for
analytics? Snowflake The third: How are we going to link the two? Airbyte
(cloud) The fourth: How are we going to make the data suitable for analytics and
dashboarding? DBT + Airflow Let’s dive into each part step by step. Where does
the data come from? I was really tired of flat files, truly tired!! So I thought
it would be a good idea to create a script that would generate dummy data on a
scheduled basis(every 2 hours) and store this in Postgres database(RDS). I will
not delve deep into this part, as it is not our main focus. So here’s a brief
overview of the data model of our source system:
This data model is intentionally basic and “dirty.” I wanted something unclean
so that I would have plenty to address in the transformation phase. Now that we
had the data model and the Python script ready to go, we needed to create an RDS
PostgreSQL database where our raw tables would be stored. To do this, we
leveraged Terraform and followed these steps: Create a VPC. Create two private
subnets where the RDS will be hosted, along with a security group to allow
traffic to port 5432. Create one EC2 instance in a public subnet where our
Python script will run (this will also act as an SSH tunnel to connect to the
database with Airbyte, as the database is not publicly accessible for security
reasons), plus a security group to allow SSH. Here is how the RDS will look at
the end:
Regarding the EC2 where the python script will be hosted, here is how it looks
like :
You are probably asking yourself what does this long peace of code do : It
simply create an EC2 instance specifying the AMI, security group, subnet, and
user_data to install docker Okay, we have the resources created on AWS, now how
to deploy our code to that EC2 instance? Simply using a CICD pipeline with
Github actions:
What does this code do? Connect to the EC2 instance using SSH Copy the content
of the directory containing the python script to that EC2 Run the container
Where the data will be store for analytics ? Now we have data that are updated
every two hours on RDS, now what is the target system? One answer: SNOWFLAKE
There is not too much to talk about in this section, a part of how I structure
the database : RAW : database to store raw data coming from Airbyte (schema :
postgres_airbyte ) ANALYTICS : the production database (schemas: staging,
intermediate, marts(finance)) DBT_DEV: the dev database (has the same schemas as
the production database) DATA_ENGINEER : A role to allow usage of RAW database
and ownership of ANALYTICS AND DBT_DEV AIRBYTE_ROLE : used by airbyte to write
in the RAW database (postgres_airbyte schema) If you ask me how to create all
this in Snowflake, you are not in tech at all. ( Google it ) How to link the two
systems / How to ingest data? Initially, I had contemplated writing a custom
Python script on AWS Lambda for this task. However, it would have taken a
significant amount of time to implement, especially considering the need to
incorporate Change Data Capture (CDC) to capture only new data from the source
system. So I decided to go with a no code/modern tools, at the core of the
“MODERN DATA STACK”. I opted for Airbyte, particularly Airbyte Cloud. While I
could have gone with the open-source version and installed it with Docker on an
EC2 instance, the cloud version offered a 14-day free trial, which I couldn’t
pass up. Syncing data from a source system to a target system is straightforward
with Airbyte. I don’t even need to explain how to do it. For the sync mode, I
chose ‘Incremental’, where Airbyte pulls only new data from the source system,
and ‘Append + Deduped’, which is self-explanatory: Airbyte ensures the
uniqueness of each row based on a specified column (typically a primary key).
In less than 10 minutes, we had our ingestion pipeline up and running, THANKS TO
THE MODERN DATA STACK !
P.S: Airbyte syncs our data every day at 5P.M (remember this, it will be useful
later) How to make data suitable for analytics/dashboarding? As you saw on the
source system data model, we can’t really use those tables for analytics and
dashboarding. That’s where the KIMBALL APPROACH came into the picture. Take a
look at this beautiful and simple schema :
P.S : our end goal is to analyze subscription plan metrics Now we have our
desired data model, how are we going to create those tables? That’s where DBT
came into the picture. Here is how we structured our DBT project -
models/staging: These are models stored in the staging schema. They undergo
simple type casting and quick transformations. Here is an example of a model we
made (stg_bank):
Pretty simple right? models/intermediate: This is where we construct our fact
and dimension tables. Here is an Example (int_date)
I also built some unit tests just to …. test :)
- models/marts: Here, we calculate various metrics, such as total net revenue by
subscription plan.
WE ARE DONE WITH DBT — for now. Let’s move to airflow Airflow + Cosmos + DBT: a
love story The DBT part was done, we needed to move to the next step. Schedule
those dbt models to run daily and refresh tables so that Data analysts/
Scientists can make analysis. Here is where Airflow came into the picture,
especially the COSMOS library, which allowed us to easiluy run DBT models with
Airflow. As I said, airbyte was ingesting data every day at 5 pm, so we needed
to schedule the airflow dag to run after 5 p.m. (every morning, we need to have
fresh tables!!). Here is how we defined our DAG using cosmos:
The next step was to deploy our airflow code to an EC2 (which we created using
TERRAFORM ALSO)
CONGRATS: Our DAG is now running successfully ( check this beautiful Airflow UI
with all your models), and the tables will be refreshed daily.
Dashboard “Keep It Simple” — that’s what I did with this dashboard. Straight to
the point. No fancy visuals or charts.
Wrap UP I kept this article very simple and understandable intentionally. I
avoided delving too deeply into technical details so you could gain an overview
of a typical data engineering workflow. If you found it helpful, feel free to
save it to your list! THANKS!!!
In Pandas, PyArrow, fastparquet, AWS Data Wrangler, PySpark and Dask. This post outlines how to use all common Python libraries to read and write Parquet format while taking advantage of columnar storage , columnar compression and data partitioning . Used together, these three optimizations can dramatically accelerate I/O for your Python applications compared to CSV, JSON, HDF or other row-based formats. Parquet makes applications possible that are simply impossible using a text format like JSON or CSV. Introduction I have recently gotten more familiar with how to work with Parquet datasets across the six major tools used to read and write from Parquet in the Python ecosystem: Pandas , PyArrow , fastparquet , AWS Data Wrangler , PySpark and Dask . My work of late in algorithmic trading involves switching between these tools a lot and as I said I often mix up the APIs. I use Pandas and PyArrow for in-RAM comput...
Comments
Post a Comment