In this post, I'll share how I'm using Kedro to manage Machine Learning (ML) pipelines while efficiently storing and executing SQL queries within my Python Kedro project. By integrating Ibis via the Kedro Ibis table dataset, I'm able to keep all my SQL code within the Kedro project and execute it on the database side, specifically using Google BigQuery.
The Challenge
When managing ML pipelines, especially with large datasets stored on Google BigQuery or other databases, it's crucial to have an efficient and scalable way to preprocess data before training your models. Often, the best approach is to perform preprocessing on a database engine using Structured Query Language (SQL). A common challenge in this process is deciding where to store and execute SQL queries. Traditionally, there are two options:
Store SQL queries in the database: This involves creating views or stored procedures. While this keeps the execution close to the data, it splits your pipeline code across different platforms.
Store SQL queries in your project: This centralises all code within your project repository (e.g., Git), but requires a way to execute these queries on the database.
The Solution: Kedro, Ibis, and the table dataset
Why Kedro?
Kedro is a platform-agnostic open-source Python framework for creating reproducible, maintainable, and modular data science code. It helps in managing the entire lifecycle of your ML projects, from data ingestion to model training and deployment.
What is Kedro?
Kedro is an open-source Python toolbox that applies software engineering principles to data science code. It makes it easier for a team to apply software engineering principles to data science code, which reduces the time spent rewriting data science experiments so that they are fit for production.
Kedro was born at QuantumBlack to solve the challenges faced regularly in data science projects and promote teamwork through standardised team workflows. It is now hosted by the LF AI & Data Foundation as an incubating project.
I usually use Kedro on my DS&ML projects because it provides a well-organized project structure, modular and reusable code, and efficient pipeline management. It centralises data and configuration management, which simplifies deployment and maintenance of models. Overall, Kedro streamlines workflows, promotes best MLOps practices, makes collaboration and maintenance easier, and saves a lot of time by eliminating the need to write everything from scratch.
Why Ibis and Ibis table dataset?
Ibis is a Python library that provides a pandas-like interface for interacting with SQL-based databases. It allows you to write expressive queries using Python syntax and execute them on the database side, leveraging the performance and scalability of the database.
ibis.TableDataset allows you to use Ibis expressions as part of your Kedro pipelines. This means you can define your database logic within your Kedro project and execute it on backends like Google BigQuery.
By combining Ibis and the Kedro Ibis table dataset, you can easily integrate SQL processing into your Python Machine Learning pipeline while benefiting from version control for your pipeline. Additionally, you can switch between different databases without changing the data processing code, even if the databases use different versions of SQL.
Task description
I plan to use Google Trends data from the public datasets available in BigQuery to develop a model for predicting future trends.
First, I logged into my Google Cloud account and navigated to the BigQuery service. My focus was on two public tables: bigquery-public-data.google_trends.international_top_terms
(containing the history of trend rankings) and bigquery-public-data.google_trends.international_top_rising_terms
(containing additional information about rising trends). Both tables are quite large, with around 200 million rows each.
Before training the model, I decided to preprocess the data with several common steps: grouping with aggregates, filtering, transforming, and merging datasets. I wrote an SQL query to accomplish this:
1SELECT
2 trends.country_name,
3 trends.month,
4 trends.term AS google_trend,
5 count_per_month,
6 avg_score,
7 avg_percent_gain
8FROM (
9 SELECT
10 country_name,
11 FORMAT_DATE('%Y-%m', week) AS month,
12 term,
13 COUNT(*) AS count_per_month,
14 AVG(score) AS avg_score
15 FROM
16 `bigquery-public-data.google_trends.international_top_terms`
17 WHERE
18 score IS NOT NULL
19 GROUP BY
20 country_name, month, term
21) AS trends
22LEFT JOIN (
23 SELECT
24 country_name,
25 FORMAT_DATE('%Y-%m', week) AS month,
26 term,
27 AVG(percent_gain) AS avg_percent_gain
28 FROM
29 `bigquery-public-data.google_trends.international_top_rising_terms`
30 GROUP BY
31 country_name, month, term
32) AS rising_trends
33ON
34 trends.month = rising_trends.month
35 AND trends.term = rising_trends.term
36 AND trends.country_name = rising_trends.country_name
I could store that SQL query as a database view and use the view name in my Kedro Python project to access the preprocessed data. This approach allows you to preprocess data on the database engine, but it has a few drawbacks:
you need permissions to create database objects
you can't control the future of that view—meaning no version control with Git, and someone could change or accidentally remove it. Therefore, I decided to use Ibis with the table dataset to replicate the same SQL query in pure Python within my Kedro project.
Implementation Steps
1. I opened my Python code IDE and created a new python Kedro data science project, along with installing the required packages:
1pip install kedro ibis-framework[bigquery]
2kedro new
I named my project kedro-ibis-bigquery
, answered all the other questions with the default options, and received a kedro-ibis-bigquery
folder containing an empty default Kedro project. This folder includes template folders and files for project configuration, data, source code, project metadata in pyproject.toml
, and a description in README.md
. Additionally, when creating a new project, there were options to add default linting, docs, and testing tools, which help maintain the project according to best software development practices.
2. I created a new empty Kedro pipeline using the following command: kedro pipeline create data_processing
A Kedro pipeline is a sequence of nodes (Python functions) that are executed as part of the pipeline. In my DS and ML projects, I typically create several pipelines such as data_processing
, model_training
, and model_evaluation
. In Kedro, pipelines and nodes are stored in the src/kedro_ibis_bigquery/pipelines/pipeline_name/
folders.
3. Then I navigated to folder kedro-ibis-bigquery
and edited a few files:
conf/base/catalog.yml
This is the Kedro Data Catalog where I need to set all my Dataset descriptions. I'm describing two BigQuery tables with the type ibis.TableDataset
and connection backend specification. The project_id: aesthetic-site-421415
is the name of my personal Google Cloud Project (where query execution will be performed), and dataset_id
is the database and schema name.
1international_top_terms:
2 type: ibis.TableDataset
3 table_name: international_top_terms
4 connection:
5 backend: bigquery
6 project_id: aesthetic-site-421415
7 dataset_id: bigquery-public-data.google_trends
8
9international_top_rising_terms:
10 type: ibis.TableDataset
11 table_name: international_top_rising_terms
12 connection:
13 backend: bigquery
14 project_id: aesthetic-site-421415
15 dataset_id: bigquery-public-data.google_trends
16
17preprocessed_data:
18 type: pandas.CSVDataset
19 filepath: data/02_intermediate/preprocessed_data.csv
I want the preprocessed data to be stored in my local file system in CSV format, rather than being returned to the database. Therefore, I'm using pandas.CSVDataset
for that.
src/kedro_ibis_bigquery/pipelines/data_processing/nodes.py
Here I described my data_processing
function, which contains Python code written using the Ibis library syntax (similar to Pandas in Python or dplyr in R). This function fully replicates the SQL query we saw earlier.
1import ibis
2
3def data_processing(itt, itrt):
4 trends = (
5 itt.filter(itt.score.notnull())
6 .mutate(month=itt.week.cast('string').left(7))
7 .group_by([itt.country_name, 'month', itt.term])
8 .aggregate(avg_score=itt.score.mean())
9 )
10
11 rising_trends = (
12 itrt.mutate(month=itrt.week.cast('string').left(7))
13 .group_by([itrt.country_name, 'month', itrt.term])
14 .aggregate(avg_percent_gain=itrt.percent_gain.mean())
15 )
16
17 result = trends.left_join(
18 rising_trends,
19 [
20 trends.country_name == rising_trends.country_name,
21 trends.month == rising_trends.month,
22 trends.term == rising_trends.term
23 ]
24 )[
25 trends.country_name,
26 trends.month,
27 trends.term.name('google_trend'),
28 trends.avg_score,
29 rising_trends.avg_percent_gain
30 ]
31
32 return result.to_pandas()
The result object is an ibis.TableDataset
. I want it to be saved as a CSV later, so I converted it to a pandas DataFrame.
src/kedro_ibis_bigquery/pipelines/data_processing/pipeline.py
Here I created a Kedro pipeline consisting of one node, which includes the previously described data_processing
function. The pipeline has two inputs from BigQuery and one output to a CSV, as described in the DataCatalog.
1from kedro.pipeline import Pipeline, pipeline, node
2
3from .nodes import data_processing
4
5def create_pipeline(**kwargs) -> Pipeline:
6 return pipeline([
7 node(
8 func=data_processing,
9 inputs=["international_top_terms", "international_top_rising_terms"],
10 outputs="preprocessed_data",
11 name="preprocess_data",
12 ),
13 ])
4. Now I can run the kedro run
command to execute my pipeline. This will preprocess the dataset, store it in my local directory, and allow me to continue developing my Kedro ML pipeline with the model training part. The data preprocessing is executed on the database engine, and all my SQL queries are saved within my Python pipeline.
Benefits
Centralized Code Management: All pipeline code, including SQL queries, is stored and managed within your Kedro project.
Scalability: By executing SQL queries on Google BigQuery, you leverage its scalability and performance.
Reproducibility: Kedro ensures that your ML pipelines are reproducible, making it easier to share and collaborate with others.
Flexibility: Ibis allows you to easily switch between database engines and backends without needing to rewrite SQL code, simply by changing the connection backend option.
Conclusion
By integrating Ibis with Kedro, I was able to create a scalable and maintainable solution for managing ML pipelines that include SQL query execution. This approach keeps all code centralised within the Kedro project, allowing for better version control and collaboration. If you're facing similar challenges, I highly recommend exploring the combination of Kedro, Ibis, and ibis.TableDataset
.
Feel free to reach out if you have any questions or need further details on the implementation. You can contact me via Slack or Telegram.
Recently on the Kedro blog
Recently published on the Kedro blog:
We’re always looking for collaborators to write about their experiences using Kedro. Get in touch with us on our Slack workspace to tell us your story!