Data Engineering Bootcamp 2024 (Week 2) Mage & GCP ETL orchestation

Is Mage a realistic alternative to Airflow? Discover Mage AI for data ETLs, the next gen data engineering tool for streaming pipelines and notebook like pipelines.

Data Engineering zoomcamp week 2.


Data engenieering Week 2: Mage workflow orchestation


Data Pipelines orchestation with Mage

What is Mage? Replacement for Airflow?

Mage appears as a solid Airflow competitor. A game-changer UI that combines drag-and-drop feelings with organized Notebook editor chunks of code, runnable and testable from the same interface.

Mage also provides seamless integration of real-time pipelines.

Hopefully, Mage would also make running tasks backfilling smoother than in Airflow.

Their promise regarding backfilling, quoting their own words: “With Mage you can easily define a backfill for a specific date range. Whether you’re missing data or want to add new columns, Mage has got you covered. Simply specify the start and end dates, and let Mage do the rest. It will simulate multiple runs with different execution dates, fetching the data you need and transforming it as required. No more worrying about losing data or dealing with duplicate entries. Mage takes care of it all, making your data engineering tasks a lot easier”

Still Airflow has dominated the Data Engenieering market for so many years. We have seen that the software world for a new tool to overtake a market leader needs to be at least 10x better to compensate the cost of migration. Dagster, Prefect… Is it the case now for Mage?

My feeling is that the demand for real-time analytics will increase in the next years. That’s when a new tool that makes data streaming easier would come up to get a bigger portion of the DE pie toolkit.


Mage core entities

To be honest, they are quite the same than the ones from Airflow. In my opinion this fact accelerates the initial flattened part of a sigmoid-shaped learning curve.

  • Project: Like a repository on GitHub; this is where you write all your code.

  • Pipeline: Contains references to all the blocks of code you want to run, charts for visualizing data, and organizes the dependency between each block of code.

  • Block: A file with code that can be executed independently or within a pipeline.

  • Data product: Every block produces data after it’s been executed. These are called data products in Mage.

  • Trigger: A set of instructions that determine when or how a pipeline should run.

  • Run: Stores information about when it was started, its status, when it was completed, any runtime variables used in the execution of the pipeline or block, etc.


Mage SQLite orchestation data

By default, Mage uses SQLite to store orchestration data such as triggers, pipeline runs, and block runs. If you wish to use a different database for your Mage projects, you can change this by setting the MAGE_DATABASE_CONNECTION_URL environment variable. However, for most default setups and local development, SQLite serves as the primary database for managing orchestration data. Additionally, for project-level data, the mage_data directory holds project-level cache data and stores the result of Block runs within a SQLite database located at mage_data/[project_folder]/mage-ai.db.

The SQLite database structure used by Mage for project-level data is located in the mage_data directory. The SQLite database, mage-ai.db, stores project-level metadata, such as pipeline and block run data. A sample tree of tables in the tables schema includes:

└── tables
    ├── backfill
    ├── block_run
    ├── event_matcher
    ├── oauth2_access_token
    ├── oauth2_application
    ├── permission
    ├── pipeline_run
    ├── pipeline_schedule
    ├── pipeline_schedule_event_matcher_association
    ├── role
    ├── secret
    ├── sqlite_master
    ├── user
    └── user_role

You can connect to the SQLite DB like any other database. For Docker installs, the JDBC URL is jdbc:sqlite:/home/src/mage_data/[project_folder]/mage-ai.db


Load data, transform and insert to dockerized Postgres database with Mage

Our starting point is starting two containes, one for the Mage image and other for the Postgres database image. The docker-compose.yaml file would look like:

version: '3'
services:
  magic:
    image: mageai/mageai:latest
    command: mage start ${PROJECT_NAME}
    env_file:
      - .env
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      USER_CODE_PATH: /home/src/${PROJECT_NAME}
      POSTGRES_DBNAME: ${POSTGRES_DBNAME}
      POSTGRES_SCHEMA: ${POSTGRES_SCHEMA}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_HOST: ${POSTGRES_HOST}
      POSTGRES_PORT: ${POSTGRES_PORT}
    ports:
      - 6789:6789
    volumes:
      - .:/home/src/
    restart: on-failure:5
  postgres:
    image: postgres:14
    restart: on-failure
    container_name: ${PROJECT_NAME}-postgres
    env_file:
      - .env
    environment:
      POSTGRES_DB: ${POSTGRES_DBNAME}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    ports:
      - "${POSTGRES_PORT}:5432"

A Dockerfile like:

FROM mageai/mageai:latest

ARG USER_CODE_PATH=/home/src/${PROJECT_NAME}

COPY requirements.txt ${USER_CODE_PATH}requirements.txt 

RUN pip3 install -r ${USER_CODE_PATH}requirements.txt

And a .env file as follows:

PROJECT_NAME=magic-zoomcamp
POSTGRES_DBNAME=postgres
POSTGRES_SCHEMA=magic
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_HOST=postgres
POSTGRES_PORT=5432

You can access the Mage UI by default: http://localhost:6789/ after running docker-compose up.

We are creating a new pipeline. You can rename it in the Edit tab.

Then we are ready for start creating blocks. Let’s start with a basic Ingest-Transform-Insert.

First of all we need to config the required credentials for the Postgres DB. In the io_config.yaml file, we can create as many profiles as desired, each of them holding the config vars needed to run our pipelines with different configurations, for example in different environments. Let’s create an dev profile, there we should place our env vars defined in the docker-compose.yaml file:

dev:
  POSTGRES_CONNECT_TIMEOUT: 10
  POSTGRES_DBNAME: {{ env_var('POSTGRES_DBNAME') }}
  POSTGRES_SCHEMA: {{ env_var('POSTGRES_SCHEMA') }} # Optional
  POSTGRES_USER: {{ env_var('POSTGRES_USER') }}
  POSTGRES_PASSWORD: {{ env_var('POSTGRES_PASSWORD') }}
  POSTGRES_HOST: {{ env_var('POSTGRES_HOST') }}
  POSTGRES_PORT: {{ env_var('POSTGRES_PORT') }}

The ingestion would be a simple Python chuck of code, loading it by using an url. As we will see, all the code bloacks are stored in a way that they could be reused in any other pipeline. So we can reuse this block of code to retrieve this data from diferent pipelines.

Let’s create a simple transformation. We can simply filter out some records that don’t meet some specs like number of passsengers less than 1 or duration less than 10 seconds.

Exporting Data to PostgreSQL, by using the previously created configuration profile for database connections. There is an paramount important concept while working with this kind of runned tasks, that is not new and was also present in Airflow, idempotence. You want to make sure that the task data output is the same if called multiple times, and replaces any existing data to avoid duplication.

Finally, we could verify that the data has been correctly written to PostgreSQL by querying the database and checking the output.


Insert data to Google Cloud Storage

We are reusing now the load and transform blocks. We only need to configure the GCP connection and implement a new data_exporter code block linked and in the bottom of the pipeline.

We are going to use the credentials JSON file exported from GCP, with a properly permission granted. You can check how to do so here

After that, for the export block, we need to choose Python > Google Cloud Storage.

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
    """
    Template for exporting data to a Google Cloud Storage bucket.
    Specify your configuration settings in 'io_config.yaml'.

    Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
    """
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    bucket_name = 'terraform-demo-20240115-demo-bucket'
    object_key = 'nyc_taxi_data_youtube.parquet'

    GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
        df,
        bucket_name,
        object_key,
    )

As simple as that. We can check that everything went as expected by executing a simple query to the X

But this is not how you’d usually manage that export. You may want to make partitions in your data, in order to have smaller parquets to write and load.

import pyarrow as pa
import pyarrow.parquet as pq 
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/src/keys/my_creds.json"

bucket_name = 'terraform-demo-20240115-demo-bucket'
project_id = "concise-quarter-411516"

table_name = "green_taxi"

root_path = f'{bucket_name}/{table_name}'

@data_exporter
def export_data(data, *args, **kwargs):
    """
    Exports data to some source.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Output (optional):
        Optionally return any object and it'll be logged and
        displayed when inspecting the block run.
    """

    table = pa.Table.from_pandas(data)

    gcs = pa.fs.GcsFileSystem()

    pq.write_to_dataset(
        table,
        root_path=root_path,
        partition_cols=["lpep_pickup_date"],
        filesystem=gcs,
        existing_data_behavior="delete_matching"
    )

It’s important to note that by setting the existing_data_behavior parameter to delete_matching, we are deleting the whole partition in case it exists. Otherwise, it will add the new files to the existing partition, creating duplicated files.


Parametrized ETL Excution

The ease of setting up backfills in Mage is particularly beneficial for pipelines that are parameterized based on execution dates. This tool simplifies the process, making it less frustrating and more efficient. Data Engineering Best Practices: Matt emphasizes the importance of creating idempotent pipelines, which ensure that rerunning pipelines for the same data doesn’t lead to data loss or duplication. While this approach is most useful for pipelines dependent on execution dates, it’s a straightforward method for backfilling data in various scenarios.


Extra Mage documentation and video tutorials


Stay updated on the Data Engineering and Data Analytics Engineer career paths

This was the content I gathered for the second week of the DataTalks Data Engineering bootcamp. We have a Mage pipeline deployed and running periodically on GCP and storing data in Google Cloud Storage.

If you want to stay updated, check the homeworks along with explanations…

Carlos Vecina
Carlos Vecina
Senior Data Scientist at Jobandtalent

Senior Data Scientist at Jobandtalent | AI & Data Science for Business

Related