Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 9 additions & 84 deletions gwlearn/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
import pandas as pd
from joblib import dump, load
from libpysal import graph
from libpysal import graph, kernels
from scipy.spatial import KDTree
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.model_selection import train_test_split
Expand All @@ -19,58 +19,6 @@
__all__ = ["BaseClassifier", "BaseRegressor"]


def _triangular(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return 1 - u


def _parabolic(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return 1 - u**2


def _gaussian(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = distances / bandwidth
return np.exp(-((u / 2) ** 2))


def _bisquare(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return (1 - u**2) ** 2


def _cosine(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return np.cos(np.pi / 2 * u)


def _exponential(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = distances / bandwidth
return np.exp(-u)


def _boxcar(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
r = (distances < bandwidth).astype(int)
return r


def _tricube(distances: np.ndarray, bandwidth: np.ndarray | float) -> np.ndarray:
u = np.clip(distances / bandwidth, 0, 1)
return (1 - u**3) ** 3


_kernel_functions = {
"triangular": _triangular,
"parabolic": _parabolic,
# "gaussian": _gaussian,
"bisquare": _bisquare,
"tricube": _tricube,
"cosine": _cosine,
"boxcar": _boxcar,
# "exponential": _exponential,
}


class _BaseModel(BaseEstimator):
"""Base class for geographically weighted models"""

Expand Down Expand Up @@ -141,33 +89,22 @@ def _build_weights(self) -> graph.Graph:
f"got {self.bandwidth}."
)

kernel = (
_kernel_functions[self.kernel]
if isinstance(self.kernel, str)
else self.kernel
)

if self.fixed: # fixed distance
weights = graph.Graph.build_kernel(
self.geometry,
kernel=kernel,
kernel=self.kernel,
bandwidth=self.bandwidth,
coplanar=self.coplanar,
decay=True,
)
else: # adaptive KNN
weights = graph.Graph.build_kernel(
self.geometry,
kernel="identity",
kernel=self.kernel,
k=self.bandwidth - 1 if self.include_focal else self.bandwidth,
bandwidth="adaptive",
coplanar=self.coplanar,
)
# post-process identity weights by the selected kernel
# and kernel bandwidth derived from each neighborhood
# the epsilon comes from MGWR to avoid division by zero
bandwidth = weights._adjacency.groupby(level=0).transform("max") * 1.0000001
weights = graph.Graph(
adjacency=kernel(weights._adjacency, bandwidth),
is_sorted=True,
decay=True,
)
if self.include_focal:
weights = weights.assign_self_weight(1)
Expand Down Expand Up @@ -459,22 +396,17 @@ def _prepare_prediction_neighborhoods(
if not self.fixed and not isinstance(bw, Integral):
raise ValueError("Adaptive bandwidth (fixed=False) must be an integer.")

kernel = (
_kernel_functions[self.kernel]
if isinstance(self.kernel, str)
else self.kernel
)

if self.fixed:
input_ids, indices_array = self.geometry.sindex.query(
geometry, predicate="dwithin", distance=self.bandwidth
)
local_ids = self._local_models.index[indices_array.flatten()].to_numpy()
distance = kernel(
distance = kernels.kernel(
self.geometry.iloc[indices_array].distance(
geometry.iloc[input_ids], align=False
),
bw,
kernel=self.kernel,
)
else:
training_coords = self.geometry.get_coordinates()
Expand All @@ -493,7 +425,7 @@ def _prepare_prediction_neighborhoods(
kernel_bandwidth = (
pd.Series(distances).groupby(input_ids).transform("max") + 1e-6
) # can't have 0
distance = kernel(distances, kernel_bandwidth)
distance = kernels.kernel(distances, kernel_bandwidth, kernel=self.kernel)

split_indices = np.where(np.diff(input_ids))[0] + 1
local_model_ids = np.split(local_ids, split_indices)
Expand Down Expand Up @@ -582,13 +514,6 @@ def _validate_fit_inputs(
if not self.fixed and not isinstance(bw, Integral):
raise ValueError("Adaptive bandwidth (fixed=False) must be an integer.")

if isinstance(self.kernel, str):
if self.kernel not in _kernel_functions:
raise ValueError(
f"Invalid kernel '{self.kernel}'. "
f"Supported kernels are: {list(_kernel_functions.keys())} "
"or a callable."
)
elif not callable(self.kernel):
raise ValueError(
"kernel must be either a valid string or a callable function."
Expand Down
32 changes: 28 additions & 4 deletions gwlearn/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sklearn.metrics import accuracy_score, mean_absolute_error
from sklearn.model_selection import GridSearchCV

from gwlearn.base import BaseClassifier, BaseRegressor, _kernel_functions
from gwlearn.base import BaseClassifier, BaseRegressor


def test_init_default_parameters():
Expand Down Expand Up @@ -91,7 +91,7 @@ def test_init_keep_models_path():
def test_init_kernel_assignment():
"""Test BaseClassifier initialization with various kernel options."""
# Test with each predefined kernel
for kernel_name in _kernel_functions:
for kernel_name in ["bisquare", "tricube", "triangular"]:
clf = BaseClassifier(
LogisticRegression,
bandwidth=100,
Expand Down Expand Up @@ -255,7 +255,19 @@ def test_fit_with_keep_models_path(sample_data):
assert len(model_files) > 0


@pytest.mark.parametrize("kernel", _kernel_functions)
@pytest.mark.parametrize(
"kernel",
[
"triangular",
"parabolic",
# "gaussian",
"bisquare",
"tricube",
"cosine",
"boxcar",
# "exponential",
],
)
def test_fit_different_kernels(sample_data, kernel):
"""Test fitting with different kernel functions."""
X, y, geometry = sample_data
Expand Down Expand Up @@ -1437,7 +1449,19 @@ def test_regressor_fit_with_keep_models_path(sample_regression_data):
assert len(model_files) > 0


@pytest.mark.parametrize("kernel", _kernel_functions)
@pytest.mark.parametrize(
"kernel",
[
"triangular",
"parabolic",
# "gaussian",
"bisquare",
"tricube",
"cosine",
"boxcar",
# "exponential",
],
)
def test_regressor_fit_different_kernels(sample_regression_data, kernel):
"""Test fitting with different kernel functions."""
X, y, geometry = sample_regression_data
Expand Down
Loading