Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 147 additions & 29 deletions deepxde/icbc/boundary_conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,94 @@ def __init__(self, geom, on_boundary, component):
)

def filter(self, X):
"""Extracts points from a set that satisfy the boundary condition.

Args:
X (np.ndarray): An array of points (coordinates).

Returns:
np.ndarray: A subset of ``X`` containing only points that lie
on the specific boundary defined by ``self.on_boundary``.
"""
return X[self.on_boundary(X, self.geom.on_boundary(X))]

def collocation_points(self, X):
"""Returns the points where the boundary condition error will be evaluated.

For standard BCs, this is identical to the filtered boundary points.
Subclasses like ``PeriodicBC`` may override this to include
paired points.

Args:
X (np.ndarray): The full set of available boundary points.

Returns:
np.ndarray: The subset of points designated for BC loss calculation.
"""
return self.filter(X)

def normal_derivative(self, X, inputs, outputs, beg, end):
r"""Computes the directional derivative along the outward normal vector.

This is used for Neumann and Robin boundary conditions to calculate
:math:`\frac{\partial \hat{y}}{\partial n} = \nabla \hat{y} \cdot \mathbf{n}`.

Args:
X (np.ndarray): The coordinates of the boundary points.
inputs (Tensor): The input tensor to the neural network.
outputs (Tensor): The output tensor from the neural network.
beg (int): The starting index of the points in the current batch.
end (int): The ending index of the points in the current batch.

Returns:
Tensor: A column vector representing the normal derivative at
each point in the range [beg, end].
"""
dydx = grad.jacobian(outputs, inputs, i=self.component, j=None)[beg:end]
n = self.boundary_normal(X, beg, end, None)
return bkd.sum(dydx * n, 1, keepdims=True)

@abstractmethod
def error(self, X, inputs, outputs, beg, end, aux_var=None):
"""Returns the loss."""
# aux_var is used in PI-DeepONet, where aux_var is the input function evaluated
# at x.
"""Calculates the residual (loss) for the boundary condition.

This method must be implemented by subclasses to define the
specific physics of the boundary (e.g., :math:`\hat{y} - y_{true}`).

Args:
X (np.ndarray): Boundary point coordinates.
inputs (Tensor): Neural network input tensor.
outputs (Tensor): Neural network output tensor.
beg (int): Start index for the point batch.
end (int): End index for the point batch.
aux_var (Tensor, optional): The input function evaluated at x. Only used in PI-DeepONet architectures.

Returns:
Tensor: The computed residual which the optimizer will
attempt to minimize toward zero.
"""
pass


class DirichletBC(BC):
"""Dirichlet boundary conditions: y(x) = func(x)."""
"""Dirichlet boundary conditions: $y(x) = f(x)$.

Enforces the value of the solution at the boundary.

Args:
geom: A ``dde.geometry.Geometry`` instance.
func: A function returning the boundary values.
Signature: ``(x) -> array_like (N, 1)``.
on_boundary: Function identifying the target boundary.
component (int): Output component to which this BC applies.
"""

def __init__(self, geom, func, on_boundary, component=0):
super().__init__(geom, on_boundary, component)
self.func = npfunc_range_autocache(utils.return_tensor(func))

def error(self, X, inputs, outputs, beg, end, aux_var=None):
"""Returns the Dirichlet BC residual: $\hat{y}(x) - f(x)$."""
values = self.func(X, beg, end, aux_var)
if bkd.ndim(values) == 2 and bkd.shape(values)[1] != 1:
raise RuntimeError(
Expand All @@ -82,32 +145,51 @@ def error(self, X, inputs, outputs, beg, end, aux_var=None):


class NeumannBC(BC):
"""Neumann boundary conditions: dy/dn(x) = func(x)."""
"""Neumann boundary conditions: dy/dn(x) = func(x).

Enforces the value of the derivative of the solution in the direction
of the outward normal.

Args:
geom: A ``dde.geometry.Geometry`` instance.
func: A function returning the boundary values.
Signature: ``(x) -> array_like (N, 1)``.
on_boundary: Function identifying the target boundary.
component (int): Output component to which this BC applies.
"""

def __init__(self, geom, func, on_boundary, component=0):
super().__init__(geom, on_boundary, component)
self.func = npfunc_range_autocache(utils.return_tensor(func))

def error(self, X, inputs, outputs, beg, end, aux_var=None):
r"""Computes the Neumann residual: $\frac{\partial \hat{y}}{\partial n} - f(x)$."""
values = self.func(X, beg, end, aux_var)
return self.normal_derivative(X, inputs, outputs, beg, end) - values


class RobinBC(BC):
"""Robin boundary conditions: dy/dn(x) = func(x, y)."""
"""Robin boundary condition: $\frac{\partial y}{\partial n}(x) = f(x, y)$.

A weighted combination of Dirichlet and Neumann conditions.
"""

def __init__(self, geom, func, on_boundary, component=0):
super().__init__(geom, on_boundary, component)
self.func = func

def error(self, X, inputs, outputs, beg, end, aux_var=None):
"""Computes the Robin residual: $\frac{\partial \hat{y}}{\partial n} - f(x, \hat{y})$."""
return self.normal_derivative(X, inputs, outputs, beg, end) - self.func(
X[beg:end], outputs[beg:end]
)


class PeriodicBC(BC):
"""Periodic boundary conditions on component_x."""
"""Periodic boundary condition: $y(x_{left}) = y(x_{right})$.

Setting derivative_order=1 enforces periodicity of the first derivative, setting derivative_order=0 enforces periodicity of the function values.
"""

def __init__(self, geom, component_x, on_boundary, derivative_order=0, component=0):
super().__init__(geom, on_boundary, component)
Expand All @@ -119,11 +201,24 @@ def __init__(self, geom, component_x, on_boundary, derivative_order=0, component
)

def collocation_points(self, X):
"""Generates pairs of points on opposing periodic boundaries.

Note that for Periodic Boundary Conditions, the collocation points are pairs rather than individual points on a single boundary.
This method identifies points on the first boundary and maps them to their corresponding points on the second boundary
using the geometry's periodic mapping function.

Returns:
np.ndarray: A vertical stack of points from boundary 1 and their
corresponding mapped points on boundary 2.
"""
X1 = self.filter(X)
X2 = self.geom.periodic_point(X1, self.component_x)
return np.vstack((X1, X2))

def error(self, X, inputs, outputs, beg, end, aux_var=None):
"""Computes the difference in first derivative (if self.derivative_order == 1)
or function value (if self.derivative_order == 0) between the two periodic edges.
"""
mid = beg + (end - beg) // 2
if self.derivative_order == 0:
yleft = outputs[beg:mid, self.component : self.component + 1]
Expand Down Expand Up @@ -159,6 +254,7 @@ def __init__(self, geom, func, on_boundary):
self.func = func

def error(self, X, inputs, outputs, beg, end, aux_var=None):
"""Computes the residual of the operator BC: func(inputs, outputs, X)."""
return self.func(inputs, outputs, X)[beg:end]


Expand Down Expand Up @@ -202,15 +298,18 @@ def __init__(self, points, values, component=0, batch_size=None, shuffle=True):
self.batch_indices = None

def __len__(self):
"""Returns the total number of points in the BC."""
return self.points.shape[0]

def collocation_points(self, X):
"""Retrieves points for the current training iteration."""
if self.batch_size is not None:
self.batch_indices = self.batch_sampler.get_next(self.batch_size)
return self.points[self.batch_indices]
return self.points

def error(self, X, inputs, outputs, beg, end, aux_var=None):
"""Computes the residual between network output and ground truth data."""
if self.batch_size is not None:
if isinstance(self.component, numbers.Number):
return (
Expand Down Expand Up @@ -272,45 +371,58 @@ def __init__(self, points, values, func, batch_size=None, shuffle=True):
self.batch_indices = None

def __len__(self):
"""Returns the total number of points in the BC."""

return self.points.shape[0]

def collocation_points(self, X):
"""Retrieves points for the current training iteration."""

if self.batch_size is not None:
self.batch_indices = self.batch_sampler.get_next(self.batch_size)
return self.points[self.batch_indices]
return self.points

def error(self, X, inputs, outputs, beg, end, aux_var=None):
"""Computes user-defined operator residual minus target values."""

if self.batch_size is not None:
return self.func(inputs, outputs, X)[beg:end] - self.values[self.batch_indices]
return self.func(inputs, outputs, X)[beg:end] - self.values


class Interface2DBC:
"""2D interface boundary condition.

This BC applies to the case with the following conditions:
(1) the network output has two elements, i.e., output = [y1, y2],
(2) the 2D geometry is ``dde.geometry.Rectangle`` or ``dde.geometry.Polygon``, which has two edges of the same length,
(3) uniform boundary points are used, i.e., in ``dde.data.PDE`` or ``dde.data.TimePDE``, ``train_distribution="uniform"``.
For a pair of points on the two edges, compute <output_1, d1> for the point on the first edge
and <output_2, d2> for the point on the second edge in the n/t direction ('n' for normal or 't' for tangent).
Here, <v1, v2> is the dot product between vectors v1 and v2;
and d1 and d2 are the n/t vectors of the first and second edges, respectively.
In the normal case, d1 and d2 are the outward normal vectors;
and in the tangent case, d1 and d2 are the outward normal vectors rotated 90 degrees clockwise.
The points on the two edges are paired as follows: the boundary points on one edge are sampled clockwise,
and the points on the other edge are sampled counterclockwise. Then, compare the sum with 'values',
i.e., the error is calculated as <output_1, d1> + <output_2, d2> - values,
where 'values' is the argument `func` evaluated on the first edge.
"""2D interface boundary condition for vector-valued outputs.

This boundary condition (BC) is designed for scenarios where specific jump conditions
or continuities are required across two matching edges of a geometry.

* **Network Output:** The model must have exactly two output elements, i.e., :math:`\\mathbf{y} = [y_1, y_2]`.
* **Geometry:** Must be a ``dde.geometry.Rectangle`` or ``dde.geometry.Polygon`` with two edges of identical length.
* **Sampling:** Use uniform boundary points (``train_distribution="uniform"``) in ``dde.data.PDE`` or ``dde.data.TimePDE``.

For a pair of points :math:`x_1` and :math:`x_2` located on the two specified edges, the BC computes the
dot product of the output and the direction vector :math:`\mathbf{d}`:

.. math:: \langle \mathbf{y}_1, \mathbf{d}_1 \\rangle + \langle \mathbf{y}_2, \mathbf{d}_2 \\rangle = \\text{values}

Where:

* :math:`\mathbf{d}_1, \mathbf{d}_2`: The unit vectors based on the ``direction`` argument.
* **Normal Case:** :math:`\mathbf{d}` represents the outward normal vectors.
* **Tangent Case:** :math:`\mathbf{d}` represents the outward normal vectors rotated 90° clockwise.
* **Point Pairing:** Points on the first edge are sampled clockwise; points on the second edge are sampled counter-clockwise to ensure they are correctly mapped spatially.

Args:
geom: a ``dde.geometry.Rectangle`` or ``dde.geometry.Polygon`` instance.
func: the target discontinuity between edges, evaluated on the first edge,
e.g., ``func=lambda x: 0`` means no discontinuity is wanted.
on_boundary1: First edge func. (x, Geometry.on_boundary(x)) -> True/False.
on_boundary2: Second edge func. (x, Geometry.on_boundary(x)) -> True/False.
direction (string): "normal" or "tangent".
geom: A ``dde.geometry.Rectangle`` or ``dde.geometry.Polygon`` instance.
func (callable): The target discontinuity (jump) between edges, evaluated on the
first edge. For example, ``lambda x: 0`` enforces continuity.
on_boundary1 (callable): Function identifying the first edge.
Signature: ``(x, on_boundary) -> bool``.
on_boundary2 (callable): Function identifying the second edge.
Signature: ``(x, on_boundary) -> bool``.
direction (str): The vector component to constrain. Options are ``"normal"``
or ``"tangent"``.
"""

def __init__(self, geom, func, on_boundary1, on_boundary2, direction="normal"):
Expand All @@ -329,6 +441,11 @@ def __init__(self, geom, func, on_boundary1, on_boundary2, direction="normal"):
)

def collocation_points(self, X):
"""Pairs points on two matching edges, ensuring spatial alignment.

Reverses the order of points on the second edge for polygons to
correctly match point-to-point across the interface if dde.geometry.Polygon is used.
"""
on_boundary = self.geom.on_boundary(X)
X1 = X[self.on_boundary1(X, on_boundary)]
X2 = X[self.on_boundary2(X, on_boundary)]
Expand All @@ -338,6 +455,7 @@ def collocation_points(self, X):
return np.vstack((X1, X2))

def error(self, X, inputs, outputs, beg, end, aux_var=None):
"""Computes the jump residual based on projected vector components."""
mid = beg + (end - beg) // 2
if not mid - beg == end - mid:
raise RuntimeError(
Expand Down