Source code for blocksnet.preprocessing.blocks_splitter

"""
Module to split blocks based on the distribution of buildings within them.
"""
import shapely
import geopandas as gpd
import pandas as pd
import numpy as np
from loguru import logger
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from longsgis import voronoiDiagram4plg
from tqdm import tqdm
from ..models import BaseSchema


[docs]class BlocksSchema(BaseSchema): """ Schema for validating blocks GeoDataFrame. Attributes ---------- _geom_types : list List of allowed geometry types for the blocks, default is [shapely.Polygon]. """ _geom_types = [shapely.Polygon]
[docs]class BuildingsSchema(BaseSchema): """ Schema for validating buildings GeoDataFrame. Attributes ---------- _geom_types : list List of allowed geometry types for the buildings, default is [shapely.Point]. """ _geom_types = [shapely.Point]
[docs]class BlocksSplitter: """ Splits blocks based on the distribution of buildings within them. Parameters ---------- blocks : gpd.GeoDataFrame GeoDataFrame containing block data. Must contain the following columns: - index : int - geometry : Polygon buildings : gpd.GeoDataFrame GeoDataFrame containing building data. Must contain the following columns: - index : int - geometry : Point Raises ------ AssertionError If the Coordinate Reference Systems (CRS) of `blocks` and `buildings` do not match. Methods ------- run(n_clusters: int = 4, points_quantile: float = 0.98, area_quantile: float = 0.95) -> gpd.GeoDataFrame Splits blocks based on buildings distribution. Parameters ---------- n_clusters : int Number of clusters to form within each block, default is 4. points_quantile : float Quantile value to filter blocks by the number of points, default is 0.98. area_quantile : float Quantile value to filter blocks by area, default is 0.95. Returns ------- gpd.GeoDataFrame GeoDataFrame containing all the blocks. """ def __init__(self, blocks: gpd.GeoDataFrame, buildings: gpd.GeoDataFrame): blocks = BlocksSchema(blocks) buildings = BuildingsSchema(buildings) assert blocks.crs == buildings.crs, "Blocks CRS must match buildings CRS" self.blocks = blocks self.buildings = buildings
[docs] @staticmethod def _drop_index_columns(gdf) -> None: """ Drops index columns from a GeoDataFrame if they exist. Parameters ---------- gdf : gpd.GeoDataFrame GeoDataFrame from which to drop index columns. """ if "index_left" in gdf.columns: gdf.drop(columns=["index_left"], inplace=True) if "index_right" in gdf.columns: gdf.drop(columns=["index_right"], inplace=True)
[docs] @staticmethod def _split_block(block: shapely.Polygon, buildings: gpd.GeoDataFrame, n_clusters: int) -> gpd.GeoDataFrame: """ Splits a block into smaller regions based on building locations using Voronoi diagrams and K-Means clustering. Parameters ---------- block : shapely.Polygon The geometry of the block to be split. buildings : gpd.GeoDataFrame GeoDataFrame containing the buildings within the block. n_clusters : int Number of clusters to form within the block. Returns ------- gpd.GeoDataFrame GeoDataFrame containing the split regions. """ vd = voronoiDiagram4plg(buildings, block) vd = vd.explode(index_parts=True) X = vd["geometry"].apply(lambda geom: geom.bounds).tolist() X_flat = [coord for rect in X for coord in rect] X = np.array(X_flat).reshape(-1, 4) scaler = StandardScaler() X_scaled = scaler.fit_transform(X) kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) vd["cluster"] = kmeans.fit_predict(X_scaled) vd = vd.dissolve(by="cluster") vd = vd.explode(index_parts=False) return vd.reset_index(drop=True)
[docs] def run(self, n_clusters: int = 4, points_quantile: float = 0.98, area_quantile: float = 0.95) -> gpd.GeoDataFrame: """ Splits blocks based on the distribution of buildings. Parameters ---------- n_clusters : int Number of clusters to form within each block, default is 4. points_quantile : float Quantile value to filter blocks by the number of points, default is 0.98. area_quantile : float Quantile value to filter blocks by area, default is 0.95. Returns ------- gpd.GeoDataFrame GeoDataFrame containing the split blocks. """ get_geom_coords = lambda geom: len(geom.exterior.coords) logger.info("Joining buildings and blocks to exclude duplicates") blocks = self.blocks.copy() buildings = self.buildings.copy() sjoin = buildings.sjoin(blocks) sjoin = sjoin.rename(columns={"index_right": "block_id"}) sjoin["building_id"] = sjoin.index # sjoin['intersection_area'] = sjoin.apply(lambda s : blocks.loc[s.block_id,'geometry'].intersection(s.geometry), axis=1).area # sjoin = sjoin.sort_values("intersection_area").drop_duplicates(subset="building_id", keep="last") logger.info("Choosing blocks to be splitted") quantile_num_points = blocks.geometry.apply(get_geom_coords).quantile(points_quantile) quantile_area = blocks.area.quantile(area_quantile) filtered_blocks = blocks[ (blocks.geometry.apply(get_geom_coords) > quantile_num_points) & (blocks.area > quantile_area) ] filtered_blocks = filtered_blocks[filtered_blocks.index.isin(sjoin.block_id)] sjoin = sjoin[sjoin.block_id.isin(filtered_blocks.index)] logger.info("Splitting filtered blocks") gdfs = [] for block_id, buildings_gdf in tqdm(sjoin.groupby("block_id")): try: block_geometry = blocks.loc[block_id, "geometry"] gdfs.append(self._split_block(block_geometry, buildings_gdf, n_clusters)) except: gdfs.append(filtered_blocks[filtered_blocks.index == block_id]) new_blocks = pd.concat(gdfs) old_blocks = blocks[~blocks.index.isin(filtered_blocks.index)] return BlocksSchema(pd.concat([new_blocks, old_blocks]).reset_index())