Download the util script file here: utils.py
import os
import numpy as np # for numerical values manipulation (calculation etc.)
import pandas as pd # for table/spreadsheets manipulation
os.environ['USE_PYGEOS'] = '0'
import geopandas as gpd # for geospatial data / attribute table & geometry manipulation
from shapely import wkt # for geometry data manipulation
import matplotlib.pyplot as plt # the most basic and important for making plots/figures in Python
import seaborn as sns # convenient tools for making statistical plots
import random # for generating random point
from shapely.geometry import Polygon, Point # for creating quadrats/grids and points
from scipy import spatial # for the kd-tree: https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.KDTree.html
# see kdtree.query and kdtree.query_ball_tree
from scipy import stats # for calculating chi-squared
# pointpats is a library written by PySAL developer team
from pointpats import PointPattern, as_window
from pointpats import PoissonPointProcess as csr
import pointpats.quadrat_statistics as qs
import utils as util
from tqdm import tqdm # for pregress barsubz = gpd.read_file('../resources/master-plan-2019-subzone_landuse_area.zip')
## remove some non-needed columns, and keep only some important one
keep_cols = [
'SUBZONE_N', 'SUBZONE_C',
'PLN_AREA_N', 'PLN_AREA_C',
'REGION_N', 'REGION_C',
'geometry']
subz = subz[keep_cols].copy() # overriding the object by assigning the same name, this can be risky but here is ok
subz.head()hdb = pd.read_csv('../resources/hdb_data_2023.csv.xz', index_col=0) # first (0) column contain 'index' of the rows
g_hdb = gpd.GeoDataFrame(data=hdb,
geometry=hdb['geometry'].apply(wkt.loads),
crs=subz.crs) # use the same crs as the subzone, make sure they are same before setting it
g_hdb.plot()g_hdb2 = g_hdb.sjoin(subz) # save the spatial join result as a new geopandas object
# assign it to a new name, which is safer (not overriding)
g_hdb2.head()g_hdb2['PLN_AREA_N'].unique()gdf_1 = g_hdb2[g_hdb2['PLN_AREA_N']=='TAMPINES']
gdf_1.plot('resale_price')np.percentile(gdf_1['resale_price'], 99)fig, ax = plt.subplots(figsize=(8, 6))
sns.histplot(gdf_1['resale_price'], ax=ax)
ax.axvline(x=np.percentile(gdf_1['resale_price'], 90), c='k')
plt.tight_layout()tmp = gdf_1[gdf_1['resale_price']>=np.percentile(gdf_1['resale_price'], 90)]
tmp = tmp.drop(columns=['index_right'])
tmptmp.plot()this is the data we are going to anlayze today: is it clustered?
quadrat statistics¶
using pointpats¶
pts = list(zip(tmp['X'], tmp['Y']))
print(pts)
pp = PointPattern(pts)
print(pp.summary())
#pp.plot(window= True, title= k)
q_r = qs.QStatistic(pp,shape= "rectangle", nx=7, ny=5)
ax = q_r.plot()
#ax.set_aspect('equal')
print(q_r.mr.rectangle_width, q_r.mr.rectangle_height)
print('QS =', q_r.chi2) #chi-squared test statistic for the observed point pattern
print('dof =', q_r.df) #degree of freedom
print('p-value =', q_r.chi2_pvalue) # analytical pvaluehow these are calculated?¶
let’s try to do them manually¶
llx, lly, urx, ury = tmp.total_bounds
grid = util.gen_grid(llx, lly, urx, ury, ncol=7, nrow=5, force_square=False, crs=tmp.crs)
fig, ax = plt.subplots()
grid.plot(fc='none', ec='k', ax=ax)
tmp.plot(ax=ax)
plt.tight_layout()tmp2 = tmp.sjoin(grid)
tmp3 = tmp2.groupby('box_id')[['resale_price']].count().rename(columns={'resale_price': 'count'})
tmp3grid_m = grid.merge(tmp3, left_on='box_id', right_index=True, how='left').fillna(0)
# how="left" will keep all cells in the grid, and those empty boxes will have a NA value because they were missing from tmp3, and all NA will be replaced with zeros
fig, ax = plt.subplots(figsize=(10, 7))
grid_m.plot('count', cmap='Blues', ax=ax)
tmp2.plot(ax=ax, c='red')
plt.tight_layout()chi2, chi2_pvalue = stats.chisquare(grid_m['count'].tolist())
dof = len(grid_m) - 1chi2, chi2_pvalue, dof # same as pointpats's result
def calculate_QS(point_gdf, llx=None, lly=None, urx=None, ury=None, nrow=5, ncol=5, force_square=True):
first_col = point_gdf.columns[0] # just for the counting and renaming purpose below
# if no bounding box is specify
if any([llx is None, lly is None, urx is None, ury is None]):
llx, lly, urx, ury = point_gdf.total_bounds
# generate grid
grid = util.gen_grid(llx, lly, urx, ury, ncol=ncol, nrow=nrow, force_square=force_square, crs=point_gdf.crs)
# count points in each cell and merge back to grid
tmp2 = point_gdf.sjoin(grid)
tmp3 = tmp2.groupby('box_id')[[first_col]].count().rename(columns={first_col: 'count'})
grid_m = grid.merge(tmp3, left_on='box_id', right_index=True, how='left').fillna(0)
# calculate chi2 with the count values
chi2, chi2_pvalue = stats.chisquare(grid_m['count'].tolist())
dof = len(grid_m) - 1
return grid_m, chi2, chi2_pvalue, doffig, ax = plt.subplots(figsize=(10, 7))
g, c2, pval, dof = calculate_QS(tmp, ncol=7, nrow=6, force_square=True)
g.plot('count', cmap='Blues', ax=ax)
tmp2.plot(ax=ax, c='red')
print(round(c2, 3), round(pval, 3), dof)
plt.tight_layout()fig, ax = plt.subplots(figsize=(10, 7))
g, c2, pval, dof = calculate_QS(tmp, ncol=20, nrow=10, force_square=True)
g.plot('count', cmap='Blues', ax=ax)
tmp2.plot(ax=ax, c='red')
print(round(c2, 3), round(pval, 3), dof)
plt.tight_layout()fig, ax = plt.subplots(figsize=(10, 7))
g, c2, pval, dof = calculate_QS(tmp, ncol=7, nrow=6, force_square=True)
g.plot('count', cmap='Blues', ax=ax)
tmp2.plot(ax=ax, c='red')
print(round(c2, 3), round(pval, 3), dof)
plt.tight_layout()sns.histplot(g['count'] - g['count'].sum()/len(g))def random_one_set(point_gdf):
N = len(point_gdf)
llx, lly, urx, ury = point_gdf.total_bounds
length_w = urx - llx
length_h = ury - lly
rxs = []
rys = []
pts = []
for _ in range(N):
rx = random.random() * length_w + llx
ry = random.random() * length_h + lly
rxs.append(rx)
rys.append(ry)
pts.append(Point(rx, ry))
pt_df = pd.DataFrame.from_dict({'x': rxs, 'y': rys})
pt_gdf = gpd.GeoDataFrame(pt_df, geometry=pts, crs=point_gdf.crs)
return pt_gdf
def calculate_QS_random(point_gdf, M=1, ncol=5, nrow=5, force_square=True, seed=None):
if not(seed is None):
random.seed(seed)
llx, lly, urx, ury = point_gdf.total_bounds
length_w = urx - llx
length_h = ury - lly
grid = util.gen_grid(llx, lly, urx, ury, ncol=ncol, nrow=nrow, force_square=force_square, crs=point_gdf.crs)
grids = []
chi2s = []
for sim in tqdm(range(M)):
pt_gdf = random_one_set(point_gdf)
pt_gdf2 = pt_gdf.sjoin(grid)
pt_gdf3 = pt_gdf2.groupby('box_id')[['x']].count().rename(columns={'x': 'count'})
grid_m = grid.merge(pt_gdf3, left_on='box_id', right_index=True, how='left').fillna(0)
grid_m = grid_m[['box_id', 'count']]
grid_m['sim'] = sim
grids.append(grid_m)
chi2, chi2_pvalue = stats.chisquare(grid_m['count'].tolist())
chi2s.append((chi2, chi2_pvalue))
grids = pd.concat(grids)
grids = grids.pivot(index='box_id', columns='sim', values='count').astype(int)
tests = pd.DataFrame(chi2s, columns=['chi2', 'p-val'])
return grids, testsM = 1000
r_df, c_df = calculate_QS_random(tmp, M=M, ncol=7, nrow=6, force_square=True)
#r_df
c_df.round(3)c_df[c_df['p-val']<=.05]r_df
# if we claim that the observed chi2 (`c2`) is higher than the randomized patterns `c_df['chi2]`
# what is the risk that we are wrong? calculate the p-value
above_chi2 = c_df['chi2'] > c2
larger_chi2 = sum(above_chi2)
chi2_r_pvalue = (larger_chi2 + 1.0) / (M + 1.0)
round(chi2_r_pvalue, 4) # pseudo-p-valuesns.kdeplot(c_df['chi2'])#r_df.iloc[:, :10]sns.boxplot(r_df.iloc[:, :100])fig, ax = plt.subplots(figsize=(10, 7))
for col in r_df.columns:
vs = r_df[col] #- g['count'].sum()/len(g)
sns.kdeplot(vs, ax=ax, alpha=.3)
sns.kdeplot(g['count']# - g['count'].sum()/len(g)
, ax=ax, c='k')
plt.tight_layout()Nearest Neighbor Analysis¶
point_data = tmp
pts = list(zip(tmp['X'], tmp['Y']))
# for monte carlo, need the coordinate range
llx, lly, urx, ury = point_data.total_bounds
length_w = urx - llx
length_h = ury - lly
# generate KD-Tree, a tree data structure
pts_tree = spatial.KDTree(pts)
distance, neighbors = pts_tree.query(pts, k=2) # first nearest is the point itself, distance 0, so need to skip the first value
distance = [d[1] for d in distance] # skip the first value
#neighbors = [n[1] for n in neighbors]
#distance
sns.histplot(distance)
observe_dist = np.mean(distance)
print(observe_dist)Run sumulation for random patterns¶
M = 1000
MC_dist = []
for _ in tqdm(range(M)):
rand = random_one_set(point_data)
rxs = rand['x']
rys = rand['y']
r_pts = list(zip(rxs, rys))
r_pts_tree = spatial.KDTree(r_pts)
r_distance, neighbors = r_pts_tree.query(r_pts, k=2)
r_distance = [d[1] for d in r_distance]
sim_dist_mean = np.mean(r_distance)
MC_dist.append(sim_dist_mean)
sns.histplot(MC_dist)fig, ax = plt.subplots(figsize=(10, 7))
sns.histplot(MC_dist, ax=ax)
ax.axvline(x=observe_dist, c='r')
upper = np.percentile(MC_dist, 97.5) # 5% /2
lower = np.percentile(MC_dist, 2.5) # 5% /2
ax.axvline(upper, ls='--', c='grey')
ax.axvline(lower, ls='--', c='grey')
plt.tight_layout()mu = np.mean(MC_dist)
std = np.std(MC_dist)
z = (observe_dist - mu) / (std) # z-score from random simulation
zn = len(tmp)
A = length_w * length_h
Do = observe_dist
De = 0.5/np.sqrt(n/A)
SE = 0.26136 / np.sqrt(n**2 / A)
z_score = (Do - De) / SE
z_score # z-score from the equationthe results are very similar between the two: with MC simulation and with equation
print('for MC simulation')
# For a two-sided test
p_value_two_sided = 2 * stats.norm.sf(abs(z))
print(f"Two-sided p-value for Z={z}: {p_value_two_sided}")
# For a right-tailed test (p-value for values greater than z_score)
p_value_right_tail = stats.norm.sf(z) # just for demo, if you are testing if a value is > normal
print(f"One-sided p-value (right-tailed) for Z={z}: {p_value_right_tail}")
# For a left-tailed test (p-value for values less than z_score)
p_value_left_tail = stats.norm.cdf(z)
print(f"One-sided p-value (left-tailed) for Z={z}: {p_value_left_tail}")print('for equation-based')
# For a two-sided test
p_value_two_sided = 2 * stats.norm.sf(abs(z_score))
print(f"Two-sided p-value for Z={z_score}: {p_value_two_sided}")
# For a right-tailed test (p-value for values greater than z_score)
p_value_right_tail = stats.norm.sf(z_score) # just for demo, if you are testing if a value is > normal
print(f"One-sided p-value (right-tailed) for Z={z_score}: {p_value_right_tail}")
# For a left-tailed test (p-value for values less than z_score)
p_value_left_tail = stats.norm.cdf(z_score)
print(f"One-sided p-value (left-tailed) for Z={z_score}: {p_value_left_tail}")significant left tail test indicates the observed nearest distance is shorter than random
Ripley’s K-function¶
point_data = tmp
pts = list(zip(point_data['X'], point_data['Y']))
# for monte carlo, need the coordinate range
llx, lly, urx, ury = point_data.total_bounds
length_w = urx - llx
length_h = ury - lly
#n = len(point_data)
# generate KD-Tree, a tree data structure
pts_tree = spatial.KDTree(pts)
min_l = min(length_w, length_h)
print(min_l, length_w, length_h)
area_size = length_w * length_h
n_sep = 10
l_sep = min_l / n_sep
radius_list = [r*l_sep for r in range(1, n_sep+1)]
print(radius_list)
tre = spatial.KDTree(pts)
n_pairs = []
lfuncs = []
for r in radius_list:
#pairs = tre.query_pairs(r)
# find all neighbors from one tree to another (here they are identical)
n_neighbors = tre.query_ball_tree(tre, r)
# because the two sets of points (trees) are identical, the first neighbor must be the points themselve
# the next line count the number of neighbors, and minus 1 for oneself
n_neighbors2 = [len(nb)-1 for nb in n_neighbors]
#print(r, n_neighbors2)
# only keep the total sum
n_pairs.append(sum(n_neighbors2))
l_val = np.sqrt((area_size * sum(n_neighbors2)) / (np.pi * n * (n - 1)))
lfuncs.append(l_val)
print(n_pairs)
plt.plot( radius_list, n_pairs )
plt.plot(radius_list, lfuncs)Run simulation for random pattern¶
M = 1000
#MC_pairs = []
#MC_lfuncs = []
MC_lfunc_dfs = []
for sim in tqdm(range(M)):
rand = random_one_set(point_data)
rxs = rand['x']
rys = rand['y']
r_pts = list(zip(rxs, rys))
r_pts_tree = spatial.KDTree(r_pts)
this_pairs = []
this_lfuncs = []
for r in radius_list:
n_neighbors = tre.query_ball_tree(r_pts_tree, r)
n_neighbors2 = [len(nb)-1 for nb in n_neighbors]
this_pairs.append(sum(n_neighbors2))
l_val = np.sqrt((area_size * sum(n_neighbors2)) / (np.pi * n * (n - 1)))
this_lfuncs.append(l_val)
#MC_pairs.append(this_pairs)
#MC_lfuncs.append(this_lfuncs)
tmp_res = pd.DataFrame.from_dict({
'sim': sim,
'radius': radius_list,
'found_neighbors': this_pairs,
'L-value': this_lfuncs
})
MC_lfunc_dfs.append(tmp_res)
MC_lfunc_dfs = pd.concat(MC_lfunc_dfs)
MC_lfunc_dfsMC_lfunc_dfs2 = MC_lfunc_dfs.pivot(index='radius', columns='sim', values='L-value')
MC_lfunc_dfs2.iloc[:, :10]for col in MC_lfunc_dfs2:
vs = MC_lfunc_dfs2[col]
plt.plot(radius_list, vs, c='grey', alpha=.4)
plt.plot(radius_list, lfuncs, c='r', marker='.')
rad_mean = MC_lfunc_dfs2.mean(axis=1)
rad_meanrad_std = MC_lfunc_dfs2.std(axis=1)
rad_stdfig, ax = plt.subplots(figsize=(8, 6))
ax.plot(radius_list, lfuncs, c='r', marker='.')
ax.plot(radius_list, rad_mean, c='grey', ls='--')
ax.plot(radius_list, rad_mean+rad_std*2, c='grey', ls=':', alpha=.8)
ax.plot(radius_list, rad_mean-rad_std*2, c='grey', ls=':', alpha=.8)
plt.tight_layout()the points are clustered within (around) 1500 m, and near random after that