Source code for pyFANTOM.solvers.CPU._solvers

from sksparse.cholmod import cholesky
import numpy as np
from ..commons import Solver
from ...stiffness.CPU._FEA import StiffnessKernel
from ...stiffness.CPU import GeneralStiffnessKernel, UniformStiffnessKernel
from scipy.sparse.linalg import gmres as sp_gmres, cg as sp_cg, bicgstab as sp_bicgstab
from scipy.sparse.linalg import splu, spsolve
from scipy.linalg import get_lapack_funcs
from scipy.sparse.linalg import aslinearoperator
from ...core.CPU._mgm import apply_restriction, apply_prolongation, get_restricted_l0, get_restricted_l1p
from ...geom.CPU._mesh import StructuredMesh2D, StructuredMesh3D
import logging
from typing import Union
logger = logging.getLogger(__name__)

[docs] class CHOLMOD(Solver): """ Direct sparse Cholesky solver using CHOLMOD (scikit-sparse). Most efficient direct solver for symmetric positive definite systems. Uses supernodal Cholesky factorization with reusable symbolic factorization. Recommended for problems with <3M DOF on CPU. Parameters ---------- kernel : StiffnessKernel Stiffness assembly kernel (must have shape[0] <= 3M DOF) Attributes ---------- factor : cholmod.Factor CHOLMOD factorization object (reused across solves) factorized : bool True after first factorization Methods ------- solve(rhs, rho=None, **kwargs) Solve K(rho) @ U = rhs using Cholesky factorization reset() Clear factorization to force reinitialization initialize() Compute symbolic factorization Notes ----- - Fastest solver for small-medium problems (<500k DOF) - Requires MKL-optimized scikit-sparse for best performance - Reuses symbolic factorization across iterations (only numerical refactorization) - Memory intensive: ~O(n^1.5) for 2D, ~O(n^2) for 3D - For >3M DOF, use iterative solvers (CG, MultiGrid) Examples -------- >>> from pyFANTOM.CPU import StructuredStiffnessKernel, CHOLMOD >>> solver = CHOLMOD(kernel=kernel) >>> U, residual = solver.solve(rhs=F, rho=rho) >>> print(f"Residual: {residual:.2e}") """
[docs] def __init__(self, kernel: StiffnessKernel): super().__init__() if kernel.shape[0] > 3e6: raise ValueError("Currently we do not allow CHOLMOD for problem size bigger than 3M degrees of freedom. You can override this by passing a dummy kernel and overriding the kernel attribute.") self.kernel = kernel self.factor = None self.factorized = False self.n_desvars = len(kernel.elements)
[docs] def reset(self): self.factor = None self.factorized = False
[docs] def initialize(self): rho_temp = np.ones(self.n_desvars, dtype=self.kernel.nodes.dtype) K = self.kernel.construct(rho_temp) K = self.kernel.construct(rho_temp) # Twice to get correct pointers if self.kernel.has_cons: K = K[:,self.kernel.non_con_map][self.kernel.non_con_map,:].tocsc() self.factor = cholesky(K) self.factorized = True
[docs] def solve(self, rhs, rho=None, **kwargs): if not self.factorized: self.initialize() if not rho is None: K = self.kernel.construct(rho) else: if not self.kernel.has_rho: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") K = self.kernel.construct(self.kernel.rho) out = np.copy(rhs) if self.kernel.has_cons: K = K[:,self.kernel.non_con_map][self.kernel.non_con_map,:].tocsc() rhs_ = rhs[self.kernel.non_con_map] self.factor.cholesky_inplace(K) out[self.kernel.non_con_map] = self.factor(rhs_) residual = np.linalg.norm(rhs - self.kernel@out)/np.linalg.norm(rhs) return out, residual
def cg(A, b, x0=None, rtol=1e-5, maxiter=1000): tol = rtol normb = (b*b).sum()**0.5 if x0 is None: x = b.copy() else: x = x0 r = b - A.matvec(x) rho_prev, p = None, None for iteration in range(maxiter): if (r*r).sum()**0.5/normb < tol: return x, iteration z = r rho_cur = (r*z).sum() if iteration > 0: beta = rho_cur / rho_prev p *= beta p += z else: # First spin p = np.empty_like(r) p[:] = z[:] q = A.matvec(p) alpha = rho_cur / (p*q).sum() x += alpha*p r -= alpha*q rho_prev = rho_cur return x, iteration def bicgstab(A, b, x0=None, rtol=1e-5, maxiter=1000): rhotol = np.finfo(b.dtype.char).eps**2 omegatol = rhotol matvec = A.matvec tol = rtol normb = (b*b).sum()**0.5 if x0 is None: x = np.zeros_like(b) else: x = x0 # Dummy values to initialize vars, silence linter warnings rho_prev, omega, alpha, p, v = None, None, None, None, None r = b - matvec(x) if x.any() else b.copy() rtilde = r.copy() for iteration in range(maxiter): if (r*r).sum()**0.5/normb < tol: # Are we done? return x, 0 rho = (rtilde*r).sum() if np.abs(rho) < rhotol: # rho breakdown return x, -10 if iteration > 0: if np.abs(omega) < omegatol: # omega breakdown return x, -11 beta = (rho / rho_prev) * (alpha / omega) p -= omega*v p *= beta p += r else: # First spin s = np.empty_like(r) p = r.copy() phat = p v = matvec(phat) rv = (rtilde* v).sum() if rv == 0: return x, -11 alpha = rho / rv r -= alpha*v s[:] = r[:] if (s*s).sum()**0.5/normb < tol: x += alpha*phat return x, 0 shat = s t = matvec(shat) omega = (t*s).sum() / (t*t).sum() x += alpha*phat x += omega*shat r -= omega*t rho_prev = rho return x, maxiter def gmres(A, b, x0=None, rtol=1e-5, maxiter=1000): Mb_nrm2 = (b*b).sum()**0.5 bnrm2 = Mb_nrm2 n = len(b) # ==================================================== # =========== Tolerance control from gh-8400 ========= # ==================================================== # Tolerance passed to GMRESREVCOM applies to the inner # iteration and deals with the left-preconditioned # residual. ptol_max_factor = 1. ptol = Mb_nrm2 * min(ptol_max_factor, rtol / bnrm2) presid = 0. # ==================================================== lartg = get_lapack_funcs('lartg', dtype=x.dtype) if x0 is None: x = np.zeros_like(b) else: x = x0 eps = np.finfo(x.dtype.char).eps restart = 20 # allocate internal variables v = np.empty([restart+1, n], dtype=x.dtype) h = np.zeros([restart, restart+1], dtype=x.dtype) givens = np.zeros([restart, 2], dtype=x.dtype) # legacy iteration count inner_iter = 0 for iteration in range(maxiter): if iteration == 0: r = b - A.matvec(x) if (r*r).sum()**0.5/bnrm2 < rtol: return x, 0 v[0, :] = r tmp = (v[0, :]*v[0, :]).sum()**0.5 v[0, :] *= (1 / tmp) # RHS of the Hessenberg problem S = np.zeros(restart+1, dtype=x.dtype) S[0] = tmp breakdown = False for col in range(restart): av = A.matvec(v[col, :]) w = av # Modified Gram-Schmidt h0 = (w*w).sum()**0.5 for k in range(col+1): tmp = (v[k, :]* w).sum() h[col, k] = tmp w -= tmp*v[k, :] h1 = (w*w).sum()**0.5 h[col, col + 1] = h1 v[col + 1, :] = w[:] # Exact solution indicator if h1 <= eps*h0: h[col, col + 1] = 0 breakdown = True else: v[col + 1, :] *= (1 / h1) # apply past Givens rotations to current h column for k in range(col): c, s = givens[k, 0], givens[k, 1] n0, n1 = h[col, [k, k+1]] h[col, [k, k + 1]] = [c*n0 + s*n1, -s.conj()*n0 + c*n1] # get and apply current rotation to h and S c, s, mag = lartg(h[col, col], h[col, col+1]) givens[col, :] = [c, s] h[col, [col, col+1]] = mag, 0 # S[col+1] component is always 0 tmp = -np.conjugate(s)*S[col] S[[col, col + 1]] = [c*S[col], tmp] presid = np.abs(tmp) inner_iter += 1 if presid <= ptol or breakdown: break # Solve h(col, col) upper triangular system and allow pseudo-solve # singular cases as in (but without the f2py copies): # y = trsv(h[:col+1, :col+1].T, S[:col+1]) if h[col, col] == 0: S[col] = 0 y = np.zeros([col+1], dtype=x.dtype) y[:] = S[:col+1] for k in range(col, 0, -1): if y[k] != 0: y[k] /= h[k, k] tmp = y[k] y[:k] -= tmp*h[k, :k] if y[0] != 0: y[0] /= h[0, 0] x += y @ v[:col+1, :] r = b - A.matvec(x) rnorm = (r*r).sum()**0.5 if rnorm/bnrm2 <= rtol: break elif breakdown: # Reached breakdown (= exact solution), but the external # tolerance check failed. Bail out with failure. break elif presid <= ptol: # Inner loop passed but outer didn't ptol_max_factor = max(eps, 0.25 * ptol_max_factor) else: ptol_max_factor = min(1.0, 1.5 * ptol_max_factor) ptol = presid * min(ptol_max_factor, rtol) info = 0 if (rnorm/bnrm2 <= rtol) else maxiter return x, info
[docs] class CG(Solver): """ Conjugate Gradient iterative solver. Memory-efficient Krylov subspace solver for symmetric positive definite systems. Supports both matrix-free and explicit matrix modes. Best for well-conditioned problems. Parameters ---------- kernel : StiffnessKernel Stiffness assembly kernel maxiter : int, optional Maximum iterations (default: 1000) tol : float, optional Relative convergence tolerance (default: 1e-5) matrix_free : bool, optional Use matrix-free operations (True) or explicit matrix (False). Auto-detected: True for StructuredStiffnessKernel, False for General/Uniform Attributes ---------- last_x0 : ndarray Last solution (used as initial guess for warm-starting) Methods ------- solve(rhs, rho=None, use_last=True) Solve K(rho) @ U = rhs iteratively Notes ----- - Matrix-free mode: O(n) memory, slower per iteration - Explicit mode: O(nnz) memory, faster per iteration for unstructured meshes - Warm-starting (use_last=True) accelerates successive solves - Convergence depends on condition number: κ(K) - For ill-conditioned problems, consider preconditioners or MultiGrid Examples -------- >>> solver = CG(kernel=kernel, maxiter=500, tol=1e-6) >>> U, residual = solver.solve(rhs=F, rho=rho, use_last=True) """
[docs] def __init__(self, kernel: StiffnessKernel, maxiter=1000, tol=1e-5, matrix_free=None): super().__init__() self.kernel = kernel self.last_x0 = None self.tol = tol self.maxiter = maxiter if isinstance(kernel, GeneralStiffnessKernel) or isinstance(kernel, UniformStiffnessKernel): if matrix_free is None: matrix_free = False logger.warning("Matrix free is set to False for general and uniform kernels. This is recommended for speed. Will use more memory.") elif matrix_free: logger.warning("Matrix free is set to True for general and uniform kernels. This is not recommended for speed. Will use less memory however.") elif matrix_free is None: matrix_free = True self.matrix_free = matrix_free if not self.matrix_free: self.K = None
[docs] def solve(self, rhs, rho=None, use_last=True): if self.kernel.has_rho or (not rho is None): if rho is not None: self.kernel.set_rho(rho) if use_last: if self.matrix_free: out = cg(self.kernel, rhs, x0=self.last_x0, rtol = self.tol, maxiter=self.maxiter)[0] else: self.K = aslinearoperator(self.kernel.construct(self.kernel.rho)) out = cg(self.K, rhs, x0=self.last_x0, rtol = self.tol, maxiter=self.maxiter)[0] self.last_x0 = out else: if self.matrix_free: out = cg(self.kernel, rhs, rtol = self.tol, maxiter=self.maxiter)[0] else: self.K = aslinearoperator(self.kernel.construct(self.kernel.rho)) out = cg(self.K, rhs, rtol = self.tol, maxiter=self.maxiter)[0] else: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") r = rhs - self.kernel@out residual = (r*r).sum()**0.5/(rhs*rhs).sum()**0.5 return out, residual
[docs] class BiCGSTAB(Solver): """ Biconjugate Gradient Stabilized iterative solver. More robust than CG for non-symmetric or poorly conditioned systems. Rarely needed for standard elasticity problems but useful for coupled physics or unusual BCs. Parameters ---------- kernel : StiffnessKernel Stiffness assembly kernel maxiter : int, optional Maximum iterations (default: 1000) tol : float, optional Relative convergence tolerance (default: 1e-5) matrix_free : bool, optional Use matrix-free operations or explicit matrix Notes ----- - More expensive per iteration than CG (~2x work) - Better convergence for indefinite systems - Supports warm-starting like CG - For elasticity, CG is usually sufficient and faster Examples -------- >>> solver = BiCGSTAB(kernel=kernel, tol=1e-5) >>> U, residual = solver.solve(rhs=F, rho=rho) """
[docs] def __init__(self, kernel: StiffnessKernel, maxiter=1000, tol=1e-5, matrix_free=None): super().__init__() self.kernel = kernel self.last_x0 = None self.tol = tol self.maxiter = maxiter if isinstance(kernel, GeneralStiffnessKernel) or isinstance(kernel, UniformStiffnessKernel): if matrix_free is None: matrix_free = False logger.warning("Matrix free is set to False for general and uniform kernels. This is recommended for speed. Will use more memory.") elif matrix_free: logger.warning("Matrix free is set to True for general and uniform kernels. This is not recommended for speed. Will use less memory however.") elif matrix_free is None: matrix_free = True self.matrix_free = matrix_free if not self.matrix_free: self.K = None
[docs] def solve(self, rhs, rho=None, use_last=True): if self.kernel.has_rho or (not rho is None): if rho is not None: self.kernel.set_rho(rho) if use_last: if self.matrix_free: out = bicgstab(self.kernel, rhs, x0=self.last_x0, rtol = self.tol, maxiter=self.maxiter)[0] else: self.K = self.kernel.construct(self.kernel.rho) out = sp_bicgstab(self.K, rhs, x0=self.last_x0, rtol = self.tol, maxiter=self.maxiter)[0] self.last_x0 = out else: if self.matrix_free: out = bicgstab(self.kernel, rhs, rtol = self.tol, maxiter=self.maxiter)[0] else: self.K = self.kernel.construct(self.kernel.rho) out = sp_bicgstab(self.K, rhs, rtol = self.tol, maxiter=self.maxiter)[0] else: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") r = rhs - self.kernel@out residual = (r*r).sum()**0.5/(rhs*rhs).sum()**0.5 return out, residual
[docs] class GMRES(Solver): """ Generalized Minimal Residual iterative solver. Most robust Krylov solver for general linear systems. Higher memory usage due to Arnoldi process. Use when CG/BiCGSTAB fail to converge. Parameters ---------- kernel : StiffnessKernel Stiffness assembly kernel maxiter : int, optional Maximum iterations (default: 1000) tol : float, optional Relative convergence tolerance (default: 1e-5) matrix_free : bool, optional Use matrix-free operations or explicit matrix Notes ----- - Memory: O(n * restart), where restart is typically 20 - More robust than CG for indefinite or non-symmetric systems - Slower per iteration than CG - For topology optimization, CG is usually preferred Examples -------- >>> solver = GMRES(kernel=kernel, maxiter=500) >>> U, residual = solver.solve(rhs=F, rho=rho) """
[docs] def __init__(self, kernel: StiffnessKernel, maxiter=1000, tol=1e-5, matrix_free=None): super().__init__() self.kernel = kernel self.last_x0 = None self.tol = tol self.maxiter = maxiter if isinstance(kernel, GeneralStiffnessKernel) or isinstance(kernel, UniformStiffnessKernel): if matrix_free is None: matrix_free = False logger.warning("Matrix free is set to False for general and uniform kernels. This is recommended for speed. Will use more memory.") elif matrix_free: logger.warning("Matrix free is set to True for general and uniform kernels. This is not recommended for speed. Will use less memory however.") elif matrix_free is None: matrix_free = True self.matrix_free = matrix_free if not self.matrix_free: self.K = None
[docs] def solve(self, rhs, rho=None, use_last=True): if self.kernel.has_rho or (not rho is None): if rho is not None: self.kernel.set_rho(rho) if use_last: if self.matrix_free: out = gmres(self.kernel, rhs, rtol = self.tol, maxiter=self.maxiter)[0] else: self.K = self.kernel.construct(self.kernel.rho) out = sp_gmres(self.K, rhs, x0=self.last_x0, rtol = self.tol, maxiter=self.maxiter)[0] self.last_x0 = out else: if self.matrix_free: out = gmres(self.kernel, rhs, rtol = self.tol, maxiter=self.maxiter)[0] else: self.K = self.kernel.construct(self.kernel.rho) out = sp_gmres(self.K, rhs, rtol = self.tol, maxiter=self.maxiter)[0] else: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") residual = np.linalg.norm(rhs - self.kernel@out)/np.linalg.norm(rhs) self.last_x0 = out return out, residual
[docs] class SPLU(Solver): """ Direct sparse LU solver using SuperLU. General-purpose direct solver using LU factorization. Works for non-symmetric systems but less efficient than CHOLMOD for SPD matrices. Limited to <3M DOF. Parameters ---------- kernel : StiffnessKernel Stiffness assembly kernel (must have shape[0] <= 3M DOF) Notes ----- - More general than CHOLMOD (handles non-symmetric matrices) - Less efficient than CHOLMOD for elasticity (SPD systems) - No factorization reuse (refactorizes every solve) - Memory intensive - For elasticity, prefer CHOLMOD over SPLU Examples -------- >>> solver = SPLU(kernel=kernel) >>> U, residual = solver.solve(rhs=F, rho=rho) """
[docs] def __init__(self, kernel: StiffnessKernel): super().__init__() if kernel.shape[0] > 3e6: raise ValueError("Currently we do not allow SuperLU for problem size bigger than 3M degrees of freedom. You can override this by passing a dummy kernel and overriding the kernel attribute.") self.kernel = kernel
[docs] def solve(self, rhs, rho=None, **kwargs): if not rho is None: K = self.kernel.construct(rho) else: if not self.kernel.has_rho: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") K = self.kernel.construct(self.kernel.rho) out = np.zeros_like(rhs) if self.kernel.has_cons: K = K[:,self.kernel.non_con_map][self.kernel.non_con_map,:] rhs_ = rhs[self.kernel.non_con_map] LU = splu(K) out[self.kernel.non_con_map] = LU.solve(rhs_) residual = np.linalg.norm(rhs - self.kernel@out)/np.linalg.norm(rhs) return out, residual
[docs] class SPSOLVE(Solver): """ Direct sparse solver using scipy.sparse.linalg.spsolve. Convenience wrapper around SciPy's spsolve (auto-selects LU or Cholesky). Simple to use but no factorization reuse. Limited to <3M DOF. Parameters ---------- kernel : StiffnessKernel Stiffness assembly kernel (must have shape[0] <= 3M DOF) Notes ----- - Auto-detects matrix symmetry and chooses solver - No factorization reuse (slow for repeated solves) - Simple API but inefficient for optimization - For production use, prefer CHOLMOD (better performance) Examples -------- >>> solver = SPSOLVE(kernel=kernel) >>> U, residual = solver.solve(rhs=F, rho=rho) """
[docs] def __init__(self, kernel: StiffnessKernel): super().__init__() if kernel.shape[0] > 3e6: raise ValueError("Currently we do not allow spsolve for problem size bigger than 3M degrees of freedom. You can override this by passing a dummy kernel and overriding the kernel attribute.") self.kernel = kernel
[docs] def solve(self, rhs, rho=None, **kwargs): if not rho is None: K = self.kernel.construct(rho) else: if not self.kernel.has_rho: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") K = self.kernel.construct(self.kernel.rho) out = np.copy(rhs) if self.kernel.has_cons: K = K[:,self.kernel.non_con_map][self.kernel.non_con_map,:] rhs_ = rhs[self.kernel.non_con_map] out[self.kernel.non_con_map] = spsolve(K, rhs_) residual = np.linalg.norm(rhs - self.kernel@out)/np.linalg.norm(rhs) return out, residual
def opt_coef(k): # Quadrature nodes (x is a vector of length k) x = np.cos(np.arange(1, k+1) * np.pi / (k + 0.5)) # Quadrature weights w = (1 - x) / (k + 0.5) # 4th-kind Chebyshev polynomials evaluated at x. # W is a (k x k) matrix. W = np.zeros((k, k)) W[:, 0] = 1 if k >= 2: W[:, 1] = 2 * x + 1 for i in range(2, k): W[:, i] = 2 * x * W[:, i-1] - W[:, i-2] # Compute the roots of the optimal polynomial. r = opt_roots(k) # Transform nodes to [0, 1] lam = (1 - x) / 2 # Compute p as the product over the k entries. # In MATLAB: p = prod(1 - lambda'./r, 1)' gives a k-by-1 vector. # Here, lam is length k and r is length k, so we form a (k x k) array and take the product over axis 0. p = np.prod(1 - (lam[None, :] / r[:, None]), axis=0) # Compute alpha = W' * (w .* p) alpha = np.dot(W.T, w * p) # Compute beta = 1 - cumsum((2*[0:k-1]'+1) .* alpha) beta = 1 - np.cumsum((2 * np.arange(k) + 1) * alpha) return beta def opt_roots(k): def vars_func(r, x): # Here, r is a vector of length k and x is a vector (length n). # Compute p: for each entry in x, p[j] = prod(1 - x[j]/r[i]) for i=1,...,k. p = np.prod(1 - (x[None, :] / r[:, None]), axis=0) # Compute w = x/(1 - p^2) w_val = x / (1 - p**2) # f = sqrt(w) * p f_val = np.sqrt(w_val) * p # Compute q = sum_{i=1}^k 1/(x[j] - r[i]) for each j. q = np.sum(1 / (x[None, :] - r[:, None]), axis=0) # g = x * (1/(2*w) + q) g_val = x * (1/(2 * w_val) + q) # Compute ngp: # For each j: ngp[j] = sum_{i=1}^k ( p[j]^2 + r[i]/(x[j]-r[i]) )/(x[j]-r[i]) ngp_val = np.sum(((p**2)[None, :] + r[:, None] / (x[None, :] - r[:, None])) / (x[None, :] - r[:, None]), axis=0) return w_val, f_val, g_val, ngp_val # Initial guesses: r = 0.5 - 0.5 * np.cos(np.arange(1, k+1) * np.pi / (k + 0.5)) x = 0.5 - 0.5 * np.cos((0.5 + np.arange(1, k)) * np.pi / (k + 0.5)) tol = 128 * 1e-12 # Tolerance for convergence dr = r.copy() drsize = 1.0 # Outer loop: adjust r until convergence. while drsize > tol: dx = x.copy() dxsize = 1.0 # Inner loop: adjust x until convergence. while dxsize > tol: dxsize = np.linalg.norm(dx, ord=np.inf) _, _, g, ngp = vars_func(r, x) dx = g / ngp x = x + dx # Append 1 to x to form x1. x1 = np.concatenate([x, [1]]) w_val, f, _, _ = vars_func(r, x1) f0 = np.sqrt(0.5 / np.sum(1 / r)) # Compute J elementwise. J = (f0**3 / (r**2)) + (w_val * np.abs(f))[:,None] / (r * (x1[:, None] - r)) # Solve for dr elementwise. dr = -np.linalg.solve(J, f0 - np.abs(f)) drsize = np.linalg.norm(dr, ord=np.inf) r = r + dr return r
[docs] class MultiGrid(Solver): """ Geometric multigrid solver for structured meshes. Memory-efficient iterative solver using hierarchical grid refinement. Best choice for large 3D problems (>1M DOF) where direct solvers are impractical. Significantly faster than CG for well-conditioned elasticity problems. Parameters ---------- mesh : StructuredMesh2D or StructuredMesh3D Structured mesh (required for geometric coarsening) kernel : StiffnessKernel Stiffness assembly kernel maxiter : int, optional Maximum V/W-cycles (default: 1000) tol : float, optional Relative convergence tolerance (default: 1e-5) n_smooth : int, optional Smoothing iterations per level (default: 3) omega : float, optional Damping parameter for Jacobi smoother (default: 0.5) n_level : int, optional Number of multigrid levels (default: 3) cycle : str, optional Cycle type: 'V' or 'W' (default: 'W') w_level : int or list, optional Level(s) for W-cycle recursion (default: 1) coarse_solver : str, optional Coarsest level solver: 'cg', 'bicgstab', 'gmres', 'splu', 'spsolve', 'cholmod' (default: 'splu') matrix_free : bool, optional Use matrix-free operations (default: False, not yet implemented) low_level_tol : float, optional Coarse solver tolerance (default: 1e-8) low_level_maxiter : int, optional Coarse solver max iterations (default: 5000) Attributes ---------- cycle : str 'V' or 'W' cycle type n_level : int Number of multigrid levels PRs : list Prolongation/restriction operators Methods ------- solve(rhs, rho=None, use_last=True) Solve K(rho) @ U = rhs using multigrid cycles Notes ----- - **Best for large 3D problems**: Scales O(n) vs O(n^1.5) for CHOLMOD - **GPU-friendly**: Primary solver for CUDA backend - **W-cycle**: More expensive but better convergence than V-cycle - **Requires structured mesh**: Cannot be used with GeneralMesh - **Typical performance**: 5-10x faster than CG for 3D problems >500k DOF - **Coarse solver**: 'splu' for small problems, 'cg' for very large coarse grids Examples -------- >>> from pyFANTOM.CPU import StructuredMesh3D, StructuredStiffnessKernel, MultiGrid >>> mesh = StructuredMesh3D(nx=64, ny=64, nz=64, lx=1.0, ly=1.0, lz=1.0) >>> kernel = StructuredStiffnessKernel(mesh=mesh) >>> solver = MultiGrid(mesh=mesh, kernel=kernel, n_level=4, cycle='W') >>> U, residual = solver.solve(rhs=F, rho=rho) >>> print(f"Converged in {solver.maxiter} cycles, residual: {residual:.2e}") """
[docs] def __init__(self, mesh: Union[StructuredMesh2D,StructuredMesh3D], kernel: StiffnessKernel, maxiter=1000, tol=1e-5, n_smooth=3, omega=0.5 , n_level = 3, cycle='W', w_level=1, coarse_solver='cholmod', matrix_free=False, low_level_tol = 1e-8, low_level_maxiter=5000, min_omega=0.4, omega_boost=1.06): super().__init__() self.kernel = kernel self.mesh = mesh self.last_x0 = None self.tol = tol self.maxiter = maxiter self.n_smooth = n_smooth self.omega = omega self.max_omega = omega self.min_omega = max(omega/2, min_omega) self.d_omega = (self.max_omega - self.min_omega) self.omega_boost = omega_boost self.n_level = n_level self.cycle = cycle self.w_level = w_level self.matrix_free = matrix_free self.dof = self.mesh.dof self.coarse_solver = coarse_solver self.low_level_tol = low_level_tol self.low_level_maxiter = low_level_maxiter self.factor = None self.beta = opt_coef(self.n_smooth) if isinstance(self.w_level, int): self.w_level = [self.w_level] if self.coarse_solver in ['splu', 'spsolve'] and self.matrix_free: raise ValueError("Matrix free is not supported with splu solver use cg instead.") if self.coarse_solver not in ['cg', 'bicgstab', 'gmres', 'splu', 'spsolve', 'cholmod']: raise ValueError("Coarse solver not recognized.") if not self.matrix_free: self.ptr = None self.PRs = [] else: raise NotImplementedError("Matrix free is not implemented yet.")
[docs] def reset(self): self.ptr = None
def _estimate_rho(self, A, D_inv, num_iterations=10): return 0 """ Estimate the spectral radius (largest eigenvalue) of D_inv @ A using the power method. Parameters: A : The system matrix. D_inv : The inverse of the diagonal of A (assumed to be provided as a vector or as a diagonal matrix). num_iterations : Number of iterations for the power method (default is 10). Returns: rho_est : An estimate of the spectral radius of D_inv @ A. """ # Start with a random vector. x = np.random.rand(A.shape[0]) x /= np.linalg.norm(x) for _ in range(num_iterations): # Apply the operator: D_inv * (A @ x) # If D_inv is a vector, use elementwise multiplication. x = D_inv * (A @ x) # Normalize the vector to avoid overflow/underflow issues. x /= np.linalg.norm(x) # After convergence, the Rayleigh quotient gives the dominant eigenvalue. Ax = A @ x rho_est = np.dot(x, D_inv * Ax) return rho_est def _chebyshev_smoother(self, x, b, A, D_inv, n_steps, rho): """ Applies a Chebyshev polynomial smoother based on Chebyshev polynomials of the fourth kind. Instead of applying a fixed damped Jacobi update repeatedly, this smoother uses a three-term recurrence to compute a correction that more effectively reduces the high-frequency error components. Parameters: x : Current solution vector. b : Right-hand side vector. A : System matrix. D_inv : Inverse of the diagonal of A (or a suitable preconditioner). n_steps : Number of Chebyshev smoothing steps to perform. rho : Estimate of the spectral radius of (D_inv @ A). This scales the spectrum to [0,1]. Returns: x : Updated solution vector after applying the Chebyshev smoother. """ # Initialize the correction vector (z) to zero. z = np.zeros_like(x) # Loop through each Chebyshev smoothing step. for k in range(1, n_steps + 1): residual = b - A @ x # Compute the scaling factors based on the current step k. # factor2 scales the new correction term. factor2 = (8 * k - 4) / (2 * k + 1) if k == 1: # For the first step, z is simply the scaled correction. z = factor2 * (1 / rho) * (D_inv * residual) else: # For subsequent steps, combine the previous correction (scaled by factor1) # with the new term. factor1 = (2 * k - 3) / (2 * k + 1) z = factor1 * z + factor2 * (1 / rho) * (D_inv * residual) # Update the current solution with the computed correction. x = x + z return x def _jacobi_smoother(self, x, b, A, D_inv, n_step): for i in range(n_step): x += self.omega * D_inv * (b - A @ x) return x def _setup(self): self.levels = [] D = 1/self.kernel.diagonal() rho = self._estimate_rho(self.kernel, D) self.levels.append((self.kernel, D, rho)) for i in range(self.n_level): if i == 0 and not self.ptr is None: op = get_restricted_l0(self.mesh, self.kernel, Cp = self.ptr[0]) D = 1/op.diagonal() rho = self._estimate_rho(op, D) self.levels.append((op, D, rho)) elif i == 0: self.ptr = [] if self.mesh.nel.shape[0] == 2: nnz = np.ones((self.mesh.nel[0]//2+1)*(self.mesh.nel[1]//2+1)*self.dof, dtype=np.int32) * 18 Cp = np.zeros((self.mesh.nel[0]//2+1)*(self.mesh.nel[1]//2+1)*self.dof + 1, dtype=np.int32) Cp[1:] = np.cumsum(nnz) else: nnz = np.ones((self.mesh.nel[0]//2+1)*(self.mesh.nel[1]//2+1)*(self.mesh.nel[2]//2+1)*self.dof, dtype=np.int32) * 81 Cp = np.zeros((self.mesh.nel[0]//2+1)*(self.mesh.nel[1]//2+1)*(self.mesh.nel[2]//2+1)*self.dof + 1, dtype=np.int32) Cp[1:] = np.cumsum(nnz) self.ptr.append(np.copy(Cp)) op = get_restricted_l0(self.mesh, self.kernel, Cp = Cp) D = 1/op.diagonal() rho = self._estimate_rho(op, D) self.levels.append((op, D, rho)) elif len(self.ptr) <= i: if self.mesh.nel.shape[0] == 2: nnz = np.ones((self.mesh.nel[0]//(2**(i+1))+1)*(self.mesh.nel[1]//(2**(i+1))+1)*self.dof, dtype=np.int32) * 18 Cp = np.zeros((self.mesh.nel[0]//(2**(i+1))+1)*(self.mesh.nel[1]//(2**(i+1))+1)*self.dof + 1, dtype=np.int32) Cp[1:] = np.cumsum(nnz) else: nnz = np.ones((self.mesh.nel[0]//(2**(i+1))+1)*(self.mesh.nel[1]//(2**(i+1))+1)*(self.mesh.nel[2]//(2**(i+1))+1)*self.dof, dtype=np.int32) * 81 Cp = np.zeros((self.mesh.nel[0]//(2**(i+1))+1)*(self.mesh.nel[1]//(2**(i+1))+1)*(self.mesh.nel[2]//(2**(i+1))+1)*self.dof + 1, dtype=np.int32) Cp[1:] = np.cumsum(nnz) self.ptr.append(np.copy(Cp)) op = get_restricted_l1p(self.levels[-1][0], self.mesh.nel//(2**i), self.dof, Cp = Cp) D = 1/op.diagonal() rho = self._estimate_rho(op, D) self.levels.append((op, D, rho)) else: op = get_restricted_l1p(self.levels[-1][0], self.mesh.nel//(2**i), self.dof, Cp = self.ptr[i]) D = 1/op.diagonal() rho = self._estimate_rho(op, D) self.levels.append((op, D, rho)) def _coarse_solver(self, K): if self.coarse_solver == 'cg': return lambda rhs: cg(K, rhs, rtol = self.low_level_tol, maxiter=self.low_level_maxiter)[0] elif self.coarse_solver == 'bicgstab': return lambda rhs: bicgstab(K, rhs, rtol = self.low_level_tol, maxiter=self.low_level_maxiter)[0] elif self.coarse_solver == 'gmres': return lambda rhs: gmres(K, rhs, rtol = self.low_level_tol, maxiter=self.low_level_maxiter)[0] elif self.coarse_solver == 'splu': SOLVER = splu(K) return lambda rhs: SOLVER.solve(rhs) elif self.coarse_solver == 'cholmod': K = K.tocsc() if self.factor is None: self.factor = cholesky(K, beta=1e-6) else: self.factor.cholesky_inplace(K, beta=1e-6) return lambda rhs: np.array(self.factor(rhs)) elif self.coarse_solver == 'spsolve': return lambda rhs: spsolve(K, rhs) else: raise ValueError("Coarse solver not recognized.") def _multi_grid(self, x, b, level): if level == self.n_level: return self.coarse_solve(b) # presmooth A, D, rho = self.levels[level] self.omega = self.omega * (self.omega_boost)**(level) x = self._jacobi_smoother(x, b, A, D, self.n_smooth) # x = self._chebyshev_smoother(x, b, A, D, self.n_smooth, rho) self.omega = self.omega / (self.omega_boost)**(level) # residual r = b - A@x nel = (self.mesh.nel // 2**level).astype(np.int32) # restrict coarse_residual = apply_restriction(r,nel,self.dof) # coarse_residual = self.PRs[level][1] @ r # go to next level coarse_u = coarse_residual*0.0 e = self._multi_grid(coarse_u, coarse_residual, level+1) # prolongate e = apply_prolongation(e,nel,self.dof) # e = self.PRs[level][0] @ e self.omega = self.omega * (self.omega_boost)**(level) e = self._jacobi_smoother(e, r, A, D, self.n_smooth) # e = self._chebyshev_smoother(e, r, A, D, self.n_smooth, rho) self.omega = self.omega / (self.omega_boost)**(level) x += e if self.cycle == 'w' and level in self.w_level: r = b - A@x # restrict coarse_residual = apply_restriction(r,nel,self.dof) # coarse_residual = self.PRs[level][1] @ r # go to next level coarse_u = coarse_residual*0.0 e = self._multi_grid(coarse_u, coarse_residual, level+1) # prolongate e = apply_prolongation(e,nel,self.dof) # e = self.PRs[level][0] @ e self.omega = self.omega * (self.omega_boost)**(level) e = self._jacobi_smoother(e, r, A, D, self.n_smooth) # e = self._chebyshev_smoother(e, r, A, D, self.n_smooth, rho) self.omega = self.omega / (self.omega_boost)**(level) x += e return x
[docs] def solve(self, rhs, rho=None, use_last=True): if not (self.kernel.has_rho or (not rho is None)): raise ValueError("Solver requires a density vector to be passed or set on the kernel.") if not rho is None: rho = self.kernel.set_rho(rho) if use_last and self.last_x0 is not None: x = self.last_x0 else: x = np.copy(rhs) if self.matrix_free: self._mat_free_setup() else: self._setup() v_cycle = self._mat_free_multi_grid if self.matrix_free else self._multi_grid self.coarse_solve = self._coarse_solver(self.levels[-1][0]) self.omega = self.min_omega r = rhs - self.kernel.dot(x) z = v_cycle(np.zeros_like(r), r, 0) p = z.copy() rho_old = (r*z).sum() norm_b = (rhs*rhs).sum()**0.5 for i in range(self.maxiter): q = self.kernel.dot(p) alpha = rho_old / (p*q).sum() x += alpha * p r -= alpha * q norm_r = (r*r).sum()**0.5 R = norm_r / norm_b if R < self.tol: break self.omega = self.min_omega + self.d_omega/2*np.exp((-np.clip(R,self.tol,1e-1)+self.tol)*500) z = v_cycle(np.copy(r), r,0) rho_new = (r*z).sum() beta = rho_new / rho_old p = z + beta * p rho_old = rho_new residual = norm_r / norm_b self.last_x0 = x del self.levels, self.coarse_solve return x, residual