Skip to content

Spatial Aggregation

Once the Nighttime Light (NTL) data has been retrieved and preprocessed, the final step in many workflows is to aggregate the pixel-level data into administrative boundaries or custom vector shapes. This reduces the data dimensionality and allows for longitudinal analysis at the regional level.

The toolkit provides built-in spatial aggregation directly within the NTLPipeline object. This makes it incredibly easy to track how the data evolves throughout the preprocessing stages by aggregating the intermediate steps as well as the final dataset.

Usage Example

The typical workflow involves running your preprocessing pipeline with cache_intermediates=True, loading your vector data using GeoPandas, and applying the aggregate() function.

import geopandas as gpd
from blackmarble_toolkit.pipeline import NTLPipeline

pipeline = NTLPipeline(steps)

processed_ds = pipeline.run(ds=raw_ds, cache_intermediates=True) # (1)! 

gdf = gpd.read_file("path/to/regions.geojson") # (2)! 

# Create a numeric ID for the vector shapes so rasterization works
gdf['numeric_id'] = range(len(gdf)) # (3)! 

aggregated_results = pipeline.aggregate( 
    gdf=gdf, # (4)! 
    geo_id_col="numeric_id", # (5)! 
    compute=True # (6)! 
)
  1. Run the pipeline, caching the intermediate results
  2. Load your vector shapes (e.g., administrative boundaries)
  3. Under the hood, spatial aggregation requires rasterization which ONLY supports numeric IDs.
  4. The GeoDataFrame containing the regions
  5. The specific numeric column identifier
  6. Force parallel execution of the Dask graph so all datasets are materialized immediately

String IDs and Mapping

Because the underlying rasterization tool (make_geocube) requires numeric identifiers to build spatial masks, you cannot pass string columns (like names of cities) directly to geo_id_col. The best practice is to assign an integer index, perform the aggregation, and optionally rename the IDs back to strings after the results are returned!

Intermediate vs. Final Results

When you call pipeline.run(), you can control whether you want to aggregate just the final output or all the intermediate stages of your pipeline:

  • cache_intermediates=True: The pipeline saves a snapshot of the dataset after each transformation step. Calling pipeline.aggregate() will then return a list of datasets (one for the raw data, and one for every step applied). This is extremely useful if you want to perform downstream comparisons to analyze how each individual filter or correction affects your aggregated metrics.
  • cache_intermediates=False: (Default behavior) The pipeline only keeps the final, fully-processed dataset in memory. Calling pipeline.aggregate() will return a list containing just one dataset—the final result. This is more memory-efficient and faster if you only care about the end product.

Lazy Execution vs. Materialization

Because the toolkit leverages Dask under the hood, the steps built during pipeline.run() and pipeline.aggregate() are lazily evaluated by default. This means no heavy data processing or downloading occurs until you actually attempt to use the data (e.g., by plotting it).

You can control this behavior during aggregation using the compute parameter:

  • compute=True: The pipeline explicitly triggers the entire Dask computation graph. Because pipeline.aggregate() returns a list of datasets, this parameter is highly recommended: it evaluates all datasets simultaneously in a single, highly optimized parallel operation and loads the final, condensed timeseries directly into your local memory.
  • compute=False: (Default behavior) The pipeline returns a list of datasets that are still backed by lazy Dask arrays. No data is fetched or processed yet. Generally, you need to call .compute() on a lazy dataset to materialize it. Since you are receiving a list of datasets, you would either need to write a manual for loop to call .compute() on each one individually, or you can just use compute=True to handle it for you. Leaving it as False is only recommended if you intend to export the datasets (e.g., using .to_netcdf()) which will handle the computation and writing simultaneously.

How It Works

Under the hood, the pipeline uses geocube to rasterize your vector shapes onto the identical grid of your xarray dataset just once to avoid repeating expensive rasterizations. It then iterates through the cached steps:

  1. get_gdf_mask_for_ds: Creates a spatial mask aligning the GeoDataFrame to the grid of the xarray.Dataset.
  2. get_agg_per_shape: Groups the data by the geometry ID and performs a memory-safe aggregation using Dask.

By default, the pipeline computes the mean radiance value for every shape at each time step. You can also opt-in to calculate the valid pixel percentage (valid_pct) by setting is_valid_pct=True. This metric helps you identify shapes where too many pixels were masked out (e.g., due to heavy cloud cover or snow), ensuring you can filter out statistically unreliable aggregations downstream.

API Reference

For detailed documentation, refer to the Pipeline API Reference or the underlying Aggregation Methods API Reference.