Skip to article frontmatterSkip to article content

Download the util script file here: utils.py

import os
import numpy as np  # for numerical values manipulation (calculation etc.)
import pandas as pd  # for table/spreadsheets manipulation
os.environ['USE_PYGEOS'] = '0'
import geopandas as gpd  # for geospatial data / attribute table & geometry manipulation 
from shapely import wkt  # for geometry data manipulation
import matplotlib.pyplot as plt  # the most basic and important for making plots/figures in Python 
import seaborn as sns  # convenient tools for making statistical plots

import random  # for generating random point
from shapely.geometry import Polygon, Point  # for creating quadrats/grids and points
from scipy import spatial  # for the kd-tree: https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.KDTree.html 
# see kdtree.query and kdtree.query_ball_tree
from scipy import stats  # for calculating chi-squared


# pointpats is a library written by PySAL developer team
from pointpats import PointPattern, as_window
from pointpats import PoissonPointProcess as csr
import pointpats.quadrat_statistics as qs

import utils as util

from tqdm import tqdm  # for pregress bar
subz = gpd.read_file('../resources/master-plan-2019-subzone_landuse_area.zip')

## remove some non-needed columns, and keep only some important one
keep_cols = [
    'SUBZONE_N', 'SUBZONE_C', 
    'PLN_AREA_N', 'PLN_AREA_C', 
    'REGION_N', 'REGION_C', 
    'geometry']

subz = subz[keep_cols].copy()  # overriding the object by assigning the same name, this can be risky but here is ok
subz.head()
hdb = pd.read_csv('../resources/hdb_data_2023.csv.xz', index_col=0)  # first (0) column contain 'index' of the rows
g_hdb = gpd.GeoDataFrame(data=hdb,
                         geometry=hdb['geometry'].apply(wkt.loads), 
                         crs=subz.crs)  # use the same crs as the subzone, make sure they are same before setting it
g_hdb.plot()
g_hdb2 = g_hdb.sjoin(subz)  # save the  spatial join result as a new geopandas object
# assign it to a new name, which is safer (not overriding)
g_hdb2.head()
g_hdb2['PLN_AREA_N'].unique()
gdf_1 = g_hdb2[g_hdb2['PLN_AREA_N']=='TAMPINES']
gdf_1.plot('resale_price')
np.percentile(gdf_1['resale_price'], 99)
fig, ax = plt.subplots(figsize=(8, 6))
sns.histplot(gdf_1['resale_price'], ax=ax)
ax.axvline(x=np.percentile(gdf_1['resale_price'], 90), c='k')

plt.tight_layout()
tmp = gdf_1[gdf_1['resale_price']>=np.percentile(gdf_1['resale_price'], 90)]
tmp = tmp.drop(columns=['index_right'])
tmp
tmp.plot()

this is the data we are going to anlayze today: is it clustered?

quadrat statistics

using pointpats


pts = list(zip(tmp['X'], tmp['Y']))
print(pts)
pp = PointPattern(pts)
print(pp.summary())
#pp.plot(window= True, title= k)
q_r = qs.QStatistic(pp,shape= "rectangle", nx=7, ny=5)
ax = q_r.plot()
#ax.set_aspect('equal')
print(q_r.mr.rectangle_width, q_r.mr.rectangle_height)
print('QS =', q_r.chi2) #chi-squared test statistic for the observed point pattern
print('dof =', q_r.df) #degree of freedom
print('p-value =', q_r.chi2_pvalue) # analytical pvalue

how these are calculated?

let’s try to do them manually


llx, lly, urx, ury = tmp.total_bounds

grid = util.gen_grid(llx, lly, urx, ury, ncol=7, nrow=5, force_square=False, crs=tmp.crs)

fig, ax = plt.subplots()
grid.plot(fc='none', ec='k', ax=ax)
tmp.plot(ax=ax)
plt.tight_layout()
tmp2 = tmp.sjoin(grid)
tmp3 = tmp2.groupby('box_id')[['resale_price']].count().rename(columns={'resale_price': 'count'})
tmp3
grid_m = grid.merge(tmp3, left_on='box_id', right_index=True, how='left').fillna(0)  
# how="left" will keep all cells in the grid, and those empty boxes will have a NA value because they were missing from tmp3, and all NA will be replaced with zeros

fig, ax = plt.subplots(figsize=(10, 7))
grid_m.plot('count', cmap='Blues', ax=ax)
tmp2.plot(ax=ax, c='red')
plt.tight_layout()
chi2, chi2_pvalue = stats.chisquare(grid_m['count'].tolist())
dof = len(grid_m) - 1
chi2, chi2_pvalue, dof  # same as pointpats's result

def calculate_QS(point_gdf, llx=None, lly=None, urx=None, ury=None, nrow=5, ncol=5, force_square=True):
    first_col = point_gdf.columns[0]  # just for the counting and renaming purpose below

    # if no bounding box is specify
    if any([llx is None, lly is None, urx is None, ury is None]):
        llx, lly, urx, ury = point_gdf.total_bounds
    
    # generate grid
    grid = util.gen_grid(llx, lly, urx, ury, ncol=ncol, nrow=nrow, force_square=force_square, crs=point_gdf.crs)

    # count points in each cell and merge back to grid
    tmp2 = point_gdf.sjoin(grid)
    tmp3 = tmp2.groupby('box_id')[[first_col]].count().rename(columns={first_col: 'count'})
    grid_m = grid.merge(tmp3, left_on='box_id', right_index=True, how='left').fillna(0)
    
    # calculate chi2 with the count values
    chi2, chi2_pvalue = stats.chisquare(grid_m['count'].tolist())
    dof = len(grid_m) - 1
    return grid_m, chi2, chi2_pvalue, dof
fig, ax = plt.subplots(figsize=(10, 7))
g, c2, pval, dof = calculate_QS(tmp, ncol=7, nrow=6, force_square=True)
g.plot('count', cmap='Blues', ax=ax)
tmp2.plot(ax=ax, c='red')
print(round(c2, 3), round(pval, 3), dof)
plt.tight_layout()
fig, ax = plt.subplots(figsize=(10, 7))
g, c2, pval, dof = calculate_QS(tmp, ncol=20, nrow=10, force_square=True)
g.plot('count', cmap='Blues', ax=ax)
tmp2.plot(ax=ax, c='red')
print(round(c2, 3), round(pval, 3), dof)
plt.tight_layout()
fig, ax = plt.subplots(figsize=(10, 7))
g, c2, pval, dof = calculate_QS(tmp, ncol=7, nrow=6, force_square=True)
g.plot('count', cmap='Blues', ax=ax)
tmp2.plot(ax=ax, c='red')
print(round(c2, 3), round(pval, 3), dof)
plt.tight_layout()
sns.histplot(g['count'] - g['count'].sum()/len(g))
def random_one_set(point_gdf):
    N = len(point_gdf)
    llx, lly, urx, ury = point_gdf.total_bounds
    length_w = urx - llx
    length_h = ury - lly

    rxs = []
    rys = []
    pts = []
    for _ in range(N):
        rx = random.random() * length_w + llx
        ry = random.random() * length_h + lly
        rxs.append(rx)
        rys.append(ry)
        pts.append(Point(rx, ry))
    pt_df = pd.DataFrame.from_dict({'x': rxs, 'y': rys})
    pt_gdf = gpd.GeoDataFrame(pt_df, geometry=pts, crs=point_gdf.crs)
    return pt_gdf


def calculate_QS_random(point_gdf, M=1, ncol=5, nrow=5, force_square=True, seed=None):
    if not(seed is None):
        random.seed(seed)
    llx, lly, urx, ury = point_gdf.total_bounds
    length_w = urx - llx
    length_h = ury - lly
    grid = util.gen_grid(llx, lly, urx, ury, ncol=ncol, nrow=nrow, force_square=force_square, crs=point_gdf.crs)
    
    grids = []
    chi2s = []
    for sim in tqdm(range(M)):
        pt_gdf = random_one_set(point_gdf)
        pt_gdf2 = pt_gdf.sjoin(grid)
        pt_gdf3 = pt_gdf2.groupby('box_id')[['x']].count().rename(columns={'x': 'count'})
        grid_m = grid.merge(pt_gdf3, left_on='box_id', right_index=True, how='left').fillna(0)
        grid_m = grid_m[['box_id', 'count']]
        grid_m['sim'] = sim
        grids.append(grid_m)
        chi2, chi2_pvalue = stats.chisquare(grid_m['count'].tolist())
        chi2s.append((chi2, chi2_pvalue))
    grids = pd.concat(grids)
    grids = grids.pivot(index='box_id', columns='sim', values='count').astype(int)
    tests = pd.DataFrame(chi2s, columns=['chi2', 'p-val'])
    return grids, tests
M = 1000
r_df, c_df = calculate_QS_random(tmp, M=M, ncol=7, nrow=6, force_square=True)
#r_df
c_df.round(3)
c_df[c_df['p-val']<=.05]
r_df

# if we claim that the observed chi2 (`c2`) is higher than the randomized patterns `c_df['chi2]`
# what is the risk that we are wrong? calculate the p-value

above_chi2 = c_df['chi2'] > c2
larger_chi2 = sum(above_chi2)
chi2_r_pvalue = (larger_chi2 + 1.0) / (M + 1.0)
round(chi2_r_pvalue, 4)  # pseudo-p-value
sns.kdeplot(c_df['chi2'])
#r_df.iloc[:, :10]
sns.boxplot(r_df.iloc[:, :100])
fig, ax = plt.subplots(figsize=(10, 7))

for col in r_df.columns:
    vs = r_df[col] #- g['count'].sum()/len(g)
    sns.kdeplot(vs, ax=ax, alpha=.3)
sns.kdeplot(g['count']# - g['count'].sum()/len(g)
, ax=ax, c='k')
plt.tight_layout()

Nearest Neighbor Analysis

point_data = tmp

pts = list(zip(tmp['X'], tmp['Y']))

# for monte carlo, need the coordinate range 
llx, lly, urx, ury = point_data.total_bounds
length_w = urx - llx
length_h = ury - lly

# generate KD-Tree, a tree data structure
pts_tree = spatial.KDTree(pts)
distance, neighbors = pts_tree.query(pts, k=2)  # first nearest is the point itself, distance 0, so need to skip the first value
distance = [d[1] for d in distance]  # skip the first value 
#neighbors = [n[1] for n in neighbors]
#distance
sns.histplot(distance)
observe_dist = np.mean(distance)
print(observe_dist)

Run sumulation for random patterns




M = 1000

MC_dist = []
for _ in tqdm(range(M)):
    rand = random_one_set(point_data)
    rxs = rand['x']
    rys = rand['y']
    r_pts = list(zip(rxs, rys))
    r_pts_tree = spatial.KDTree(r_pts)
    r_distance, neighbors = r_pts_tree.query(r_pts, k=2)
    r_distance = [d[1] for d in r_distance]
    sim_dist_mean = np.mean(r_distance)
    MC_dist.append(sim_dist_mean)
sns.histplot(MC_dist)
fig, ax = plt.subplots(figsize=(10, 7))

sns.histplot(MC_dist, ax=ax)
ax.axvline(x=observe_dist, c='r')
upper = np.percentile(MC_dist, 97.5)  # 5% /2
lower = np.percentile(MC_dist, 2.5)  # 5% /2
ax.axvline(upper, ls='--', c='grey')
ax.axvline(lower, ls='--', c='grey')
plt.tight_layout()
mu = np.mean(MC_dist)
std = np.std(MC_dist)

z = (observe_dist - mu) / (std)  # z-score from random simulation
z
n = len(tmp)
A = length_w * length_h
Do = observe_dist
De = 0.5/np.sqrt(n/A)
SE = 0.26136 / np.sqrt(n**2 / A)

z_score = (Do - De) / SE
z_score  # z-score from the equation

the results are very similar between the two: with MC simulation and with equation

print('for MC simulation')

# For a two-sided test
p_value_two_sided = 2 * stats.norm.sf(abs(z))
print(f"Two-sided p-value for Z={z}: {p_value_two_sided}")

# For a right-tailed test (p-value for values greater than z_score)
p_value_right_tail = stats.norm.sf(z)  # just for demo, if you are testing if a value is > normal
print(f"One-sided p-value (right-tailed) for Z={z}: {p_value_right_tail}")

# For a left-tailed test (p-value for values less than z_score)
p_value_left_tail = stats.norm.cdf(z)
print(f"One-sided p-value (left-tailed) for Z={z}: {p_value_left_tail}")
print('for equation-based')
# For a two-sided test
p_value_two_sided = 2 * stats.norm.sf(abs(z_score))
print(f"Two-sided p-value for Z={z_score}: {p_value_two_sided}")

# For a right-tailed test (p-value for values greater than z_score)
p_value_right_tail = stats.norm.sf(z_score)  # just for demo, if you are testing if a value is > normal
print(f"One-sided p-value (right-tailed) for Z={z_score}: {p_value_right_tail}")

# For a left-tailed test (p-value for values less than z_score)
p_value_left_tail = stats.norm.cdf(z_score)
print(f"One-sided p-value (left-tailed) for Z={z_score}: {p_value_left_tail}")

significant left tail test indicates the observed nearest distance is shorter than random

Ripley’s K-function

point_data = tmp

pts = list(zip(point_data['X'], point_data['Y']))

# for monte carlo, need the coordinate range 
llx, lly, urx, ury = point_data.total_bounds
length_w = urx - llx
length_h = ury - lly

#n = len(point_data)

# generate KD-Tree, a tree data structure
pts_tree = spatial.KDTree(pts)

min_l = min(length_w, length_h)
print(min_l, length_w, length_h)

area_size = length_w * length_h

n_sep = 10
l_sep = min_l / n_sep
radius_list = [r*l_sep for r in range(1, n_sep+1)]
print(radius_list)
tre = spatial.KDTree(pts)

n_pairs = []
lfuncs = []
for r in radius_list:
    #pairs = tre.query_pairs(r)
    # find all neighbors from one tree to another (here they are identical)
    n_neighbors = tre.query_ball_tree(tre, r)  
    # because the two sets of points (trees) are identical, the first neighbor must be the points themselve  
    # the next line count the number of neighbors, and minus 1 for oneself
    n_neighbors2 = [len(nb)-1 for nb in n_neighbors]  
    #print(r, n_neighbors2)
    # only keep the total sum
    n_pairs.append(sum(n_neighbors2))
    l_val = np.sqrt((area_size * sum(n_neighbors2)) / (np.pi * n * (n - 1)))
    lfuncs.append(l_val)
print(n_pairs)
plt.plot( radius_list, n_pairs )
plt.plot(radius_list, lfuncs)

Run simulation for random pattern



M = 1000

#MC_pairs = []
#MC_lfuncs = []
MC_lfunc_dfs = []
for sim in tqdm(range(M)):
    rand = random_one_set(point_data)
    rxs = rand['x']
    rys = rand['y']
    r_pts = list(zip(rxs, rys))
    r_pts_tree = spatial.KDTree(r_pts)

    this_pairs = []
    this_lfuncs = []
    for r in radius_list:
        n_neighbors = tre.query_ball_tree(r_pts_tree, r)  
        n_neighbors2 = [len(nb)-1 for nb in n_neighbors]
        this_pairs.append(sum(n_neighbors2))
        l_val = np.sqrt((area_size * sum(n_neighbors2)) / (np.pi * n * (n - 1)))
        this_lfuncs.append(l_val)
    #MC_pairs.append(this_pairs)
    #MC_lfuncs.append(this_lfuncs)
    tmp_res = pd.DataFrame.from_dict({
        'sim': sim, 
        'radius': radius_list, 
        'found_neighbors': this_pairs, 
        'L-value': this_lfuncs 
    })
    MC_lfunc_dfs.append(tmp_res)
MC_lfunc_dfs = pd.concat(MC_lfunc_dfs)
MC_lfunc_dfs
MC_lfunc_dfs2 = MC_lfunc_dfs.pivot(index='radius', columns='sim', values='L-value')
MC_lfunc_dfs2.iloc[:, :10]
for col in MC_lfunc_dfs2:
    vs = MC_lfunc_dfs2[col]
    plt.plot(radius_list, vs, c='grey', alpha=.4)
plt.plot(radius_list, lfuncs, c='r', marker='.')
rad_mean = MC_lfunc_dfs2.mean(axis=1)
rad_mean
rad_std = MC_lfunc_dfs2.std(axis=1)
rad_std
fig, ax = plt.subplots(figsize=(8, 6))

ax.plot(radius_list, lfuncs, c='r', marker='.')
ax.plot(radius_list, rad_mean, c='grey', ls='--')
ax.plot(radius_list, rad_mean+rad_std*2, c='grey', ls=':', alpha=.8)
ax.plot(radius_list, rad_mean-rad_std*2, c='grey', ls=':', alpha=.8)

plt.tight_layout()

the points are clustered within (around) 1500 m, and near random after that