Patterns, Predictions & Puzzles

How I built a Polars plugin from scratch

2025-07-02

Polar Bear Driving an F1 Car A cartoon polar bear wearing a helmet is driving a red Formula 1 racing car at high speed.
SVG of polar bear driving a Formula 1 race car, generated with Gemini 2.5 Pro

What is Polars?

Polars is a modern dataframe manipulation library for Python written in Rust. It is probably the fastest data processing library (on a single machine) available today. Here is how the library's main developer Ritchie Vink describes its main features:

  • Lazy evaluation: optimize the query plan before execution achieving maximum performance
  • Minimizes materialization: copy the data only when necessary, minimizing memory usage
  • Efficient data types
  • Written in low level language (Rust) to have full control of performance, memory, threading and critical paths
  • Designed for effective parallelism: no need for pickling or serialization, it can just process data in parallel at minimum cost
  • Designed for out-of-core processing: it can process data that doesn't fit in RAM, using data batches and saving to disk
  • Tight integration with IO: all readers in polars read directly into memory without intermediate steps with zero copy

Without any doubt, Polars is very fast and memory efficient. On several occasions I have been able to port my pandas code to Polars and achieve 5x to 10x performance improvement. However, the reason why I was initially attracted to Polars was its API expressiveness. As someone used to dplyr in R, I always found pandas' syntax to be a bit clunky and not very intuitive. I especially disliked the row and column indexing system. Polars won me over with its extremely expressive syntax and the ability to chain operations together in a very readable way.

Here is a quick example using Polars lazy API which reads the famous 'iris' dataset, filters some data out, then groups by 'species' column and finally calculates the sum of all remaining columns:

import polars as pl

q = (
    pl.scan_csv("data/iris.csv")
    .filter(pl.col("sepal_length") > 5)
    .group_by("species")
    .agg(pl.all().sum())
)

df = q.collect()

I found this code very readable and easy to understand. But enough with the intro, let's get to why I felt the need to expand Polars' capabilities.

Motivation

People who work with financial data are familiar with the concept and methodology of calculating capital gains for an investment based on a series of cash flows. In case of a stock, those cash flows are all the purchases and sales of the security. Let's say we have the following transactions:

date security type quantity transaction value
2023-01-10 XYZ buy 100 1000
2023-02-15 XYZ buy 50 600
2023-03-20 XYZ sell 30 450
2023-04-05 XYZ dividend 0 12
2023-05-05 XYZ sell 120 960
2023-06-01 XYZ buy 100 700

We want to calculate the capital gains we realized first from the sale in March 2023 and then from the sale in May 2023. To do that we have to calculate the cost of the securities we are holding at the time of the sale. There are several methods to do that, but I will use the 'average cost' method.

Using this method, the unit cost of the 30 shares that we sold in March 2023 is (1000 + 600) / (100 + 50) = 10.67. The cost of all sold shares is 30 * 10.67 = 320 and the capital gain realized is 450 - 320 = 130.

This calculation is relatively easy to do in Excel, but it can get complex quickly if we have multiple securities, multiple transaction types (some of which do not impact the cost calculation like dividends), unsorted data, etc. It is much more scalable and maintainable to do it in code.

The only complication is that data manipulation libraries like pandas and Polars do not like this type of recursive calculations where the result of a row depends on the result of previous rows as well as the values in different columns. And this is exactly the type of calculation we need to do to calculate capital gains. The unit cost from row 3 depends on the buy transactions from rows 1 and 2. Of course, we can always write a loop but that solution comes with significant performance penalties.

Here is my initial implementation in Polars:

import polars as pl

def avg_cost_native(
    df: pl.DataFrame, type_col: str, qty_col: str, amount_col: str
) -> pl.DataFrame:
    "Assumes that data is sorted by date"
    total_cost = 0.0
    unit_cost = 0.0
    total_qty = 0
    out = []
    for row in df.iter_rows(named=True):
        cap_gains = 0.0
        cus = 0.0  # cost of units sold
        typ = row[type_col]  # type of transaction (ex. 'buy', 'sell', 'div', etc.)
        q = row[qty_col]  # quantity
        amt = row[amount_col] or 0.0  # total transaction amount
        # update running cost & qty
        if typ == "buy":
            total_cost += amt
            total_qty += q
        elif typ == "sell":
            cus = unit_cost * q
            total_cost -= cus
            total_qty -= q
            cap_gains = amt - cus
        unit_cost = (total_cost / total_qty) if total_qty > 0 else 0.0
        out.append(
            {
                **row,
                "cumul_qty": total_qty,
                "cumul_avg_cost": total_cost,
                "avg_unit_cost": unit_cost,
                "cost_units_sold": cus,
                "realized_gain": cap_gains,
            }
        )
    return pl.DataFrame(out)

The function will loop over all rows of the dataframe using iter_rows(named=True) method and calculate the average unit cost as well as the realized capital gains in case of 'sell' transactions.

Using the example data from above, the function will return the following dataframe:

data = {
    "date": [
        date(2023, 1, 10),
        date(2023, 2, 15),
        date(2023, 3, 20),
        date(2023, 4, 5),
        date(2023, 5, 5),
        date(2023, 6, 1),
    ],
    "security": ["XYZ", "XYZ", "XYZ", "XYZ", "XYZ", "XYZ"],
    "type": ["buy", "buy", "sell", "dividend", "sell", "buy"],
    "quantity": [100, 50, 30, 0, 120, 100],
    "transaction_value": [1000.0, 600.0, 450.0, 12, 960, 700.0],
}

df = pl.DataFrame(data)

avg_cost_native_gr = partial(
    avg_cost_native,
    type_col="type",
    qty_col="quantity",
    amount_col="transaction_value",
)
res = df.sort("date").group_by("security").map_groups(avg_cost_native_gr)
print(res.drop(["security", "transaction_value"]))
┌────────────┬─────────┬──────────┬───────────┬───────────┬───────────┬───────────┬───────────┐
│ date       ┆ type    ┆ quantity ┆ cumul_qty ┆ cumul_avg ┆ avg_unit_ ┆ cost_unit ┆ realized_ │
│ (date)     ┆ (str)   ┆    (i64) ┆     (i64) ┆     _cost ┆      cost ┆    s_sold ┆      gain │
│            ┆         ┆          ┆           ┆     (f64) ┆     (f64) ┆     (f64) ┆     (f64) │
╞════════════╪═════════╪══════════╪═══════════╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 2023-01-10 ┆ buy     ┆      100 ┆       100 ┆  1,000.00 ┆     10.00 ┆      0.00 ┆      0.00 │
│ 2023-02-15 ┆ buy     ┆       50 ┆       150 ┆  1,600.00 ┆     10.67 ┆      0.00 ┆      0.00 │
│ 2023-03-20 ┆ sell    ┆       30 ┆       120 ┆  1,280.00 ┆     10.67 ┆    320.00 ┆    130.00 │
│ 2023-04-05 ┆ dividend┆        0 ┆       120 ┆  1,280.00 ┆     10.67 ┆      0.00 ┆      0.00 │
│ 2023-05-05 ┆ sell    ┆      120 ┆         0 ┆      0.00 ┆      0.00 ┆  1,280.00 ┆   -320.00 │
│ 2023-06-01 ┆ buy     ┆      100 ┆       100 ┆    700.00 ┆      7.00 ┆      0.00 ┆      0.00 │
└────────────┴─────────┴──────────┴───────────┴───────────┴───────────┴───────────┴───────────┘

I was using similar solutions for this type of problems for a while, but I always felt that I was doing a disservice to this fast and elegant library by throttling its performance with a loop. Of course, in practice this does not matter much if your data is less than a few million rows. Nonetheless, I was yearning for a more elegant, more Polars-like solution. Then I found out about Polars plugins.

Polars plugins

As per the Polars documentation: "Expression plugins are the preferred way to create user defined functions." The plugin system allows you to compile a Rust function and then register it as an expression into the Polars library. Once the function is registered, it runs without any interference from Python and has the same benefits as the native Polars expressions:

  • Optimization
  • Parallelism
  • Rust performance

Of course, as someone who has written zero lines of Rust code, I was a bit intimidated by the prospect of writing a full plugin. This is where Marco Gorelli's excellent Polars plugins tutorial comes in as a great resource. The tutorial chapters are full of practical examples and in combination with the starting cookiecutter template, they got me 80% of the way to a working plugin. The remaining 20%, at least in my case, was trial and error and the power of an awesome Rust compiler.

Building polars-fin

Complete code of the plugin and installation instructions can be found in the polars-fin repo.

To start I had to make sure I have a working Rust environment. I used rustup curl one-liner to install Rust and Cargo. Then using uv I installed the cookiecutter template like so:

uvx cookiecutter https://github.com/MarcoGorelli/cookiecutter-polars-plugins

I then added polars, maturin, pytest to a 'dev' dependency group in the pyproject.toml file and ran uv sync to create the virtual environment and install the dependencies. This is all the required setup. To test that everything is working properly, I ran uv run run.py which outputs in the console a five-row dataframe with the results of applying the pig_latinnify function which is part of the cookiecutter template.

The two main files that we have to change in the newly-created repo are: __init__.py (under the directory bearing the name of the Python package we just created, in my case this is "polars-fin") and expressions.rs (under the src directory). The __init__.py file is used to register the plugin with Polars and the expressions.rs file is used to define the plugin's functionality in Rust.

On the Python side

The __init__.py file is where we define a Python function which serves as a connector between Polars and the underlying Rust code of the plugin. It is usually a very simple function that takes as input a Polars expression (or several expressions) and returns a new Polars expression.

This is the first important part, each Polars expression is a function that takes as input one or more Polars series and returns a single new Polars series. In my case, however, I wanted to return several series: cumulative quantity, cumulative average cost, average unit cost, cost of units sold, and realized capital gains. Because I cannot return multiple series, my function outputs a single Polars series of type Struct which serves as a container for all the series I need. Here is the exact implementation:

def cap_gains(
    ttype: IntoExprColumn, qty: IntoExprColumn, amt: IntoExprColumn
) -> pl.Expr:
    qty = qty if isinstance(qty, pl.Expr) else pl.col(qty)
    qty = qty.cast(pl.Float64)
    ttype = ttype if isinstance(ttype, pl.Expr) else pl.col(ttype)
    ttype = ttype.str.to_lowercase()
    return register_plugin_function(
        args=[ttype, qty, amt],
        plugin_path=LIB,
        function_name="cap_gains",
        is_elementwise=False,
    )

A few more important points about this function:

  • Use the register_plugin_function utility function to register the new plugin with Polars.
  • The function_name parameter corresponds to the name of the function in the Rust code.
  • The is_elementwise parameter is set to False because our function needs to reset its logic for each group in a group_by or window function context.
  • The args parameter is a list of the arguments that we want to pass to the Rust function.

On the Rust side

As mentioned above, the expressions.rs file is where we define the plugin's functionality in Rust. The file is split into two parts: the avg_cost_output_schema helper function and the cap_gains main function. The most interesting parts of the implementation are outlined in the collapsible section below.

Rust details implementation

Rust is a compiled language, which means that each time I made a change that I wanted to test, I had to run maturin develop to compile the plugin.

1. The avg_cost_output_schema helper function

Polars has a lazy execution engine, which means it plans and optimizes queries before executing them. To do this, it needs to know the data type (or "schema") of the output that a function will produce without actually running the function's logic.

The avg_cost_output_schema function serves this exact purpose.

// Fn that returns output schema
fn avg_cost_output_schema(_input_fields: &[Field]) -> PolarsResult<Field> {
    let fields = vec![
        Field::new("cumul_qty".into(), DataType::Float64),
        Field::new("cumul_avg_cost".into(), DataType::Float64),
        // ... more fields
        ];
        Ok(Field::new("avg_cost".into(), DataType::Struct(fields)))
    }

    #[polars_expr(output_type_func=avg_cost_output_schema)]
    fn cap_gains(inputs: &[Series]) -> PolarsResult<Series> {
        // ...
    }
  • The #[polars_expr(output_type_func=...)] macro tells Polars to call avg_cost_output_schema during the query planning phase.
  • This function constructs and returns a Field, which describes the output structure. In this case, it declares that the cap_gains function will return a Struct named "avg_cost" containing five Float64 fields. This allows Polars to validate the query and optimize the execution plan accordingly.
2. The if let Statement in the for loop of cap_gains

The for loop iterates over columns that may contain null values. In Rust, optional values are represented by the Option enum, which can be either Some(value) or None. The if let construct is an idiomatic way to handle these Option types.

// type_col is a StringChunked, its iterator yields Option<&str>
for ((typ, qty_opt), amt_opt) in type_col.into_iter().zip(qty_col.into_iter()).zip(amt_col.into_iter()) {
    if let Some(transaction_type) = typ {
        match transaction_type {
            "buy" => { /* ... */ }
            "sell" => { /* ... */ }
            _ => {}
        }
    }
}
  • typ is of type Option<&str>.
  • The expression if let Some(transaction_type) = typ checks if typ is a Some.
  • If typ is a Some, the value inside Some is "unwrapped" and assigned to the new variable transaction_type. The code block inside the if is then executed.
  • If typ is None (representing a null value in the Polars Series), the condition is false, and the block is skipped entirely. This prevents the program from crashing on null data and cleanly ignores those rows.
3. Transforming Intermediary Vectors to Series

The core logic of the cap_gains function calculates results row by row. The most efficient way to collect these results within the loop is to push them into standard Rust Vec (vectors).

// 1. Initialize empty vectors
let mut cumul_qty_vec: Vec<f64> = Vec::with_capacity(type_col.len());
// ... other vectors

for /* ... */ {
    // ... calculations ...
    // 2. Push results into vectors in each iteration
    cumul_qty_vec.push(total_qty);
    // ...
}

// 3. Convert vectors to Polars Series
let s_cumul_qty = Series::new("cumul_qty".into(), &cumul_qty_vec);
// ... other series
  • Performance: Appending to a Vec is a fast, low-overhead operation in Rust.
  • Requirement: The final goal is to construct a Polars StructChunked, which requires a collection of Polars Series as input, not Rust vectors.
  • Transformation: After the loop has finished populating the vectors, they must be converted into the Series data structure that Polars understands. The Series::new() constructor takes a name and a reference to the vector (&cumul_qty_vec) to create the Polars-compatible column.
4. StructChunked Construction

The function's output is a single column where each value is a complex object (a struct). To build this, the individual Series created in the previous step are bundled together into a StructChunked.

// Create a slice of the Series
let fields = [
    s_cumul_qty,
    s_cumul_avg_cost,
    s_avg_unit_cost,
    s_cost_units_sold,
    s_realized_gain,
];

// Construct the StructChunked from the Series
StructChunked::from_series("avg_cost".into(), fields[0].len(), fields.iter())
    .map(|ca| ca.into_series())
  • StructChunked::from_series() is the constructor used to create the final structure.
  • It takes three arguments:
    1. The name of the struct column: "avg_cost".
    2. The length of the column, which can be taken from any of the component series (e.g., fields[0].len()).
    3. An iterator over the component Series that will become the fields of the struct (fields.iter()).
  • The constructor returns a Result, so .map(|ca| ca.into_series()) is used to handle the result. If construction is successful (Ok), it converts the StructChunked into a Series to match the function's required return type, PolarsResult.

Performance

The following chart shows the performance comparison between the native Python regular loop function and the Rust-based plugin. To run the benchmark, I created a synthetic dataset with 1M transactions for 5 different securities. Then I compared the time to calculate the capital gains using the regular loop function shown in the beginning of the post vs. the cap_gains function implemented in polars-fin.

The Rust plugin is 15x faster than the regular loop function.

Honestly, I did not expect such a big performance improvement considering that my Rust implementation is probably very crude and not optimized. However, the result proves the point that you don't need to be a Rust expert to build a functioning Polars plugin and achieve significant performance improvement.

Was it worth it?

For me the answer is yes for two reasons:

  1. I felt empowered by the knowledge that I can build a Rust-based plugin for Polars which, even though it is far from the most efficient or elegant implementation, still achieves 15x performance improvement over the native Python implementation.
  2. The polars-fin plugin improves considerably the readability and maintainability of my code. As the following example shows, it is very easy to reason about what the code does: first we sort the data by date, then we calculate the capital gains for each security in a new column called cap_gains.
import polars_fin as pf

(
    df.sort("date")
    .with_columns(
        pf.cap_gains("type", "quantity", "transaction_value")
        .over("security")
        .alias("cap_gains")
    )
)

I don't know whether with more experience writing Rust-based plugins for Polars can become pleasurable and quick, but I certainly feel inspired to have this tool at my disposal.

Building a Polars plugin from scratch was more accessible than I initially thought. Despite having no prior Rust experience, I was able to create a functioning plugin that delivers significant performance improvements.

Air Quality

2025-06-15

In light of the continuous wildfires, there has been a lot of attention on air quality and the impact of PM2.5 and O₃ pollutants on people's health. As this is an area that I am not familiar with, I will be using this post to explore the data and learn more about the topic.

Air quality worst offenders

According to the World Health Organization (WHO):

Together with the connected issue of climate change, the WHO recognises air pollution as the biggest global health threat in the current century. Every year, exposure to ambient air pollution is estimated to cause around 4.5 million premature deaths globally, and indoor air pollution causes a further 2.3 million.2 By comparison, the COVID-19 pandemic caused around 5 million deaths globally in 2020, and around 12 million deaths in 2021, according to the WHO. [source]

The air pollutants that pose the most significant risk to our health are PM2.5 (Particulate Matter ≤ 2.5 μm) and O₃ (Ground Level Ozone). For context, here are some details:

PM2.5 (Particulate Matter ≤ 2.5 µm)

  • Source: Combustion from cars, industry, and wood smoke. Also forms secondarily from gases like SO₂ and NOx.
  • Health Effects:
    • Penetrates deep into lungs and enters bloodstream
    • Triggers heart attacks and strokes
    • Worsens asthma and bronchitis
    • Linked to lung cancer and premature death

O₃ (Ground-Level Ozone)

  • Source: Not emitted directly. Forms from NOx + VOCs in sunlight.
  • Health Effects:
    • Irritates airways
    • Reduces lung function
    • Triggers asthma attacks and chronic respiratory diseases
    • Worsens existing heart and lung conditions

The two main questions that I am trying to answer are:

  1. What is the air quality, specifically PM2.5 and O₃ levels, in some of the major cities in North America and Europe?

  2. Has the air quality improved or worsened significantly over the last 5 years?

The data

After some research, I came across OpenAQ. This is a non-profit organization that provides air quality data from various sources around the world. They offer API access to their data, but even more conveniently, all their data is stored in a publicly accessible S3 bucket s3://openaq-data-archive/.... More details on their AWS bucket structure can be found here.

To download the data, I decided to parametrize the S3 URLs for each of the cities I am interested in. I used DuckDB's glob CSV reading functionality. I came up with this simple query that loads the data for a specific location_id:

FROM read_csv(
    "s3://openaq-data-archive/records/csv.gz/locationid={loc_id}/year=202[{from_yr}-{to_yr}]/month=*/*",
    union_by_name=True
);

The parameters loc_id, from_yr, and to_yr are used in my Python script to pass the location ID, start year, and end year for the data I want to download. I got the location IDs for some cities that I selected in a somewhat random and subjective way from this interactive map. What makes this query particularly robust is the union_by_name=True argument that is used to unify the schema of files that have different or missing columns. If a file does not have certain column, NULL values will be filled in.

Finally, I built a simple Python function which retrieves the data for each city from the S3 bucket and saves it locally in parquet files. I ended up with approximately 10MB of data for the 5 years of historical hourly sensor readings for the 22 cities I selected.

For the data exploration, I used a self-contained Marimo notebook. It is part of the project's GitHub repo and can be started on its own like this:

marimo edit air-quality.py --sandbox

The --sandbox flag ensures that the notebook runs in a self-contained temporary Python environment. All package dependencies are already defined in the special # /// script ... comment at the beginning of the file using TOML. I find this very practical and use it all the time.

Finally, the data is loaded using polars's read_parquet().

To keep the length of this post manageable, the following exploration will focus only on PM2.5 particles pollution. The air_quality.py notebook contains similar analysis of O₃ (Ground Level Ozone) levels.

PM2.5

Before digging deeper into the PM2.5 pollution data, I wanted to create a simple heatmap showing the daily average PM2.5 reading by city. For the color coding, I relied on the WHO Air Quality Guidelines. Specifically, I used the part of the guideline stating that "... 24-hour average exposures should not exceed 15 µg/m3 more than 3 - 4 days per year...".

That is why the color scale turns red at about 15 µg/m3 level. I achieved this by adjusting the domainMid parameter of the color scale in altair:

...
heatmap = base.mark_rect().encode(
    alt.Color(col)
    .title(col_title)
    .scale(scheme="redblue", domainMid=domainMid, reverse=True)
)
...

Average PM2.5 levels in the selected cities over the last 3.5 years

The cities (on the y-axis) are grouped by continent with European locations on the top. The first thing that struck me was that Athens, Greece is an outlier. It has average daily PM2.5 readings significantly higher than the rest of the sample. Apart from that, two interesting patterns emerge.

The first pattern shows that in Europe, peak PM2.5 values occur mostly between November and January. The pattern is very pronounced in the 2022-2023 and 2024-2025 seasons. It's slightly less visible in 2023-2024. My takeaway is that the increased PM2.5 levels in those periods are mainly caused by heating emissions. Interestingly, this phenomenon is not observable in North America.

The second pattern shows the impact of North American wildfires, which were particularly severe in 2023 (June-July), January 2025 (Southern California), and lastly June 2025 (Central Canada). As a result, the observed PM2.5 levels during those periods were higher in almost all of the selected North American cities.

By using average daily and monthly readings, we lose visibility of how high the PM2.5 pollution went and for how long it stayed there. Going back to WHO's definition, I wanted to understand how many times per year PM2.5 pollution exceeded 15 µg/m³ for 3 or more consecutive days. I also wanted to know the longest duration from those sequences. The following heatmap answers the first question:

As an example, there were 15 periods of 3 days or more in Vienna, Austria during 2021 when PM2.5 readings were higher than 15 µg/m³. Based on this metric, the European cities seem to be affected worse by PM2.5 pollution than the North American ones, with the exception of Los Angeles. Almost all of the examined cities in the old continent experienced high count of 3-day or longer periods with significant PM2.5 pollution. Rome, Athens and Sofia stand out in 2024 with respectively 20, 21 and 17 such periods.

The final heatmap should be examined in conjunction with the previous one. It shows the longest 3-days or more period for each year during which PM2.5 readings were higher than 15 µg/m³. Looking at the same three European cities in 2024, the longest PM2.5 pollution streak was in Rome. There, PM2.5 remained above the prescribed 15 µg/m³ threshold for 15 consecutive days.

It seems that the pollution situation in Sofia, Bulgaria is getting worse this year. The city has already seen 8 periods of 3-days or more with PM2.5 above prescribed levels. The longest of these lasted 20 days.

In terms of the practical implementation, the most interesting part of the pipeline is the following:

        ...
        .with_row_index()
        .with_columns(
            pl.when(pl.col("value") < thr).then(0).when(pl.col("value") >= thr).then(1).alias("above_thr")
        )
        .with_columns(
            pl.col("above_thr")
            .diff()
            .abs()
            .mul(pl.col("index"))
            .cum_max()
            .alias("streak_group")
        )
        .with_columns(
            pl.col("above_thr")
            .cum_sum()
            .over("city", "year", "streak_group")
            .alias("consec_above_thr")
        )
        ...

Taking the diff() on the 'above_thr' column gives me the specific points in time when the signal changed from below the threshold to above and vice versa. Then multiplying by the 'index' and taking the cum_max() assigns unique IDs to each consecutive group of 'above or below the threshold'.

Wrap up

This exploration revealed some concerning patterns in PM2.5 pollution across major cities in North America and Europe. European cities consistently show seasonal spikes during winter months, likely due to increased heating emissions. Meanwhile, North American cities face acute pollution events driven by wildfire activity.

Athens stands out as particularly problematic, with consistently high PM2.5 levels throughout the year. Cities like Rome and Sofia also show troubling trends, with extended periods of poor air quality that far exceed WHO recommendations.

All the code and data from this post are available in this repo