Source code for pyFANTOM.solvers.CUDA._solvers

import cupy as cp
from ..commons import Solver
from sksparse.cholmod import cholesky
from ...stiffness.CUDA._FEA import StiffnessKernel as CuStiffnessKernel
from ...stiffness.CUDA._FEA import GeneralStiffnessKernel as CuGeneralKernel
from ...stiffness.CUDA._FEA import UniformStiffnessKernel as CuUniformKernel
from cupyx.scipy.sparse.linalg import cg, gmres, spsolve, splu
import time
import logging
from ...core.CUDA._mgm import (
    apply_restriction_cuda,
    apply_prolongation_cuda,
    get_restricted_l0_cuda,
    get_restricted_l1p_cuda
)
from ...geom.CUDA._mesh import CuStructuredMesh2D as StructuredMesh2D
from ...geom.CUDA._mesh import CuStructuredMesh3D as StructuredMesh3D
from typing import Union
loger = logging.getLogger(__name__)


[docs] class CG(Solver): """ CUDA-accelerated Conjugate Gradient iterative solver. GPU version of the CG solver using CuPy. Efficiently solves large sparse symmetric positive definite linear systems Kx=b on GPU with optional matrix-free operation. Parameters ---------- kernel : CuStiffnessKernel CUDA stiffness kernel (matrix operator) maxiter : int, optional Maximum number of CG iterations (default: 1000) tol : float, optional Convergence tolerance for relative residual (default: 1e-5) matrix_free : bool, optional Whether to use matrix-free mode. If None, automatically determined: - True for StructuredStiffnessKernel (recommended) - False for General/UniformStiffnessKernel (faster) Notes ----- - Matrix-free mode: Lower memory, slower per-iteration - Assembled matrix mode: Higher memory, faster per-iteration - For structured meshes, matrix-free is preferred - For general meshes, assembled matrix is faster Examples -------- >>> from pyFANTOM.CUDA import StructuredMesh2D, StructuredStiffnessKernel, CG >>> mesh = StructuredMesh2D(nx=256, ny=256, lx=1.0, ly=1.0) >>> kernel = StructuredStiffnessKernel(mesh=mesh) >>> solver = CG(kernel=kernel, maxiter=1000, tol=1e-6) """
[docs] def __init__(self, kernel: CuStiffnessKernel, maxiter=1000, tol=1e-5, matrix_free=None): super().__init__() self.kernel = kernel self.last_x0 = None self.tol = tol self.maxiter = maxiter if isinstance(kernel, CuGeneralKernel) or isinstance(kernel, CuUniformKernel): if matrix_free is None: matrix_free = False loger.warning("Matrix free is set to False for general and uniform kernels. This is recommended for speed. Will use more memory.") elif matrix_free: loger.warning("Matrix free is set to True for general and uniform kernels. This is not recommended for speed. Will use less memory however.") elif matrix_free is None: matrix_free = True self.matrix_free = matrix_free if not self.matrix_free: self.K = None
[docs] def solve(self, rhs, rho=None, use_last=True): """ Solve linear system K(rho) @ U = rhs on GPU. Parameters ---------- rhs : cp.ndarray Right-hand side force vector on GPU, shape (n_dof,) rho : cp.ndarray, optional Design variables (densities) on GPU, shape (n_elements,) use_last : bool, optional Use previous solution as initial guess (warm-starting), default: True Returns ------- U : cp.ndarray Solution vector on GPU, shape (n_dof,) residual : float Relative residual: ||K@U - rhs|| / ||rhs|| Raises ------ ValueError If rho is not provided and kernel.has_rho is False Notes ----- - All operations performed on GPU using CuPy - Warm-starting (use_last=True) accelerates convergence - Matrix-free mode uses kernel.dot() directly - Assembled mode constructs explicit CSR matrix first """ if self.kernel.has_rho or (not rho is None): if rho is not None: self.kernel.set_rho(rho) if use_last: if self.matrix_free: out = cg(self.kernel, rhs, x0=self.last_x0, tol = self.tol, maxiter=self.maxiter)[0] residual = cp.linalg.norm(rhs - self.kernel@out)/cp.linalg.norm(rhs) else: self.K = self.kernel.construct(self.kernel.rho) out = cg(self.K, rhs, x0=self.last_x0, tol = self.tol, maxiter=self.maxiter)[0] residual = cp.linalg.norm(rhs - self.K@out)/cp.linalg.norm(rhs) self.last_x0 = out else: if self.matrix_free: out = cg(self.kernel, rhs, tol = self.tol, maxiter=self.maxiter)[0] residual = cp.linalg.norm(rhs - self.kernel@out)/cp.linalg.norm(rhs) else: self.K = self.kernel.construct(self.kernel.rho) out = cg(self.K, rhs, tol = self.tol, maxiter=self.maxiter)[0] residual = cp.linalg.norm(rhs - self.K@out)/cp.linalg.norm(rhs) else: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") return out, residual
[docs] class GMRES(Solver): """ CUDA-accelerated Generalized Minimal Residual iterative solver. GPU version of GMRES solver using CuPy. Solves non-symmetric linear systems Kx=b on GPU. More general than CG but typically slower and more memory-intensive. Parameters ---------- kernel : CuStiffnessKernel CUDA stiffness kernel (matrix operator) maxiter : int, optional Maximum number of GMRES iterations (default: 1000) tol : float, optional Convergence tolerance for relative residual (default: 1e-5) matrix_free : bool, optional Whether to use matrix-free mode. If None, automatically determined: - True for StructuredStiffnessKernel - False for General/UniformStiffnessKernel (recommended) Notes ----- - GMRES handles non-symmetric systems (rare in FEA) - CG is preferred for symmetric positive definite systems - Higher memory usage than CG due to Krylov subspace storage Examples -------- >>> from pyFANTOM.CUDA import StructuredMesh2D, StructuredStiffnessKernel, GMRES >>> solver = GMRES(kernel=kernel, maxiter=1000, tol=1e-6) """
[docs] def __init__(self, kernel: CuStiffnessKernel, maxiter=1000, tol=1e-5, matrix_free=None): super().__init__() self.kernel = kernel self.last_x0 = None self.tol = tol self.maxiter = maxiter if isinstance(kernel, CuGeneralKernel) or isinstance(kernel, CuUniformKernel): if matrix_free is None: matrix_free = False loger.warning("Matrix free is set to False for general and uniform kernels. This is recommended for speed. Will use more memory.") elif matrix_free: loger.warning("Matrix free is set to True for general and uniform kernels. This is not recommended for speed. Will use less memory however.") elif matrix_free is None: matrix_free = True self.matrix_free = matrix_free if not self.matrix_free: self.K = None
[docs] def solve(self, rhs, rho=None, use_last=True): """ Solve linear system K(rho) @ U = rhs on GPU using GMRES. Parameters ---------- rhs : cp.ndarray Right-hand side force vector on GPU, shape (n_dof,) rho : cp.ndarray, optional Design variables (densities) on GPU, shape (n_elements,) use_last : bool, optional Use previous solution as initial guess (warm-starting), default: True Returns ------- U : cp.ndarray Solution vector on GPU, shape (n_dof,) residual : float Relative residual: ||K@U - rhs|| / ||rhs|| Raises ------ ValueError If rho is not provided and kernel.has_rho is False Notes ----- - All operations performed on GPU using CuPy - Warm-starting (use_last=True) accelerates convergence - Matrix-free mode uses kernel.dot() directly - Assembled mode constructs explicit CSR matrix first - GMRES handles non-symmetric systems (rare in FEA) - CG is preferred for symmetric positive definite systems """ if self.kernel.has_rho or (not rho is None): if rho is not None: self.kernel.set_rho(rho) if use_last: if self.matrix_free: out = gmres(self.kernel, rhs, x0=self.last_x0, tol = self.tol, maxiter=self.maxiter)[0] residual = cp.linalg.norm(rhs - self.kernel@out)/cp.linalg.norm(rhs) else: self.K = self.kernel.construct(self.kernel.rho) out = gmres(self.K, rhs, x0=self.last_x0, tol = self.tol, maxiter=self.maxiter)[0] residual = cp.linalg.norm(rhs - self.K@out)/cp.linalg.norm(rhs) self.last_x0 = out else: if self.matrix_free: out = gmres(self.kernel, rhs, tol = self.tol, maxiter=self.maxiter)[0] residual = cp.linalg.norm(rhs - self.kernel@out)/cp.linalg.norm(rhs) else: self.K = self.kernel.construct(self.kernel.rho) out = gmres(self.K, rhs, tol = self.tol, maxiter=self.maxiter)[0] residual = cp.linalg.norm(rhs - self.K@out)/cp.linalg.norm(rhs) else: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") return out, residual
[docs] class SPSOLVE(Solver): """ CUDA sparse direct solver using CuPy's spsolve. GPU-accelerated direct sparse LU factorization solver. Fast for small to medium problems but memory-intensive. Limited to < 3M DOF due to GPU memory constraints. Parameters ---------- kernel : CuStiffnessKernel CUDA stiffness kernel (must construct explicit matrix) Raises ------ ValueError If kernel.shape[0] > 3e6 (3 million DOF limit) Notes ----- - Direct solver: No iterations, exact solution (within numerical precision) - High memory usage: Stores factorization on GPU - Use iterative solvers (CG, MultiGrid) for large problems - Automatically handles constrained DOF Examples -------- >>> from pyFANTOM.CUDA import StructuredMesh2D, StructuredStiffnessKernel, SPSOLVE >>> mesh = StructuredMesh2D(nx=64, ny=64, lx=1.0, ly=1.0) >>> kernel = StructuredStiffnessKernel(mesh=mesh) >>> solver = SPSOLVE(kernel=kernel) # OK for small problems """
[docs] def __init__(self, kernel: CuStiffnessKernel): super().__init__() if kernel.shape[0] > 3e6: raise ValueError("Currently we do not allow spsolve for problem size bigger than 3M degrees of freedom. You can override this by passing a dummy kernel and overriding the kernel attribute.") self.kernel = kernel
[docs] def solve(self, rhs, rho=None, **kwargs): """ Solve the linear system Kx = rhs using sparse direct solver. Parameters ---------- rhs : cp.ndarray Right-hand side vector rho : cp.ndarray, optional Design variable densities. If None, uses kernel's stored rho **kwargs Additional keyword arguments (unused, for API compatibility) Returns ------- U : cp.ndarray Solution vector residual : float Relative residual norm ||rhs - K@U|| / ||rhs|| Notes ----- - Direct solver: No warm start needed - Constructs and factorizes K every call (expensive) - Consider iterative solvers for repeated solves in optimization """ if not rho is None: K = self.kernel.construct(rho) else: if not self.kernel.has_rho: raise ValueError("Solver requires a density vector to be passed or set on the kernel.") K = self.kernel.construct(self.kernel.rho) out = cp.copy(rhs) if self.kernel.has_con: K = K[:,self.kernel.non_con_map][self.kernel.non_con_map,:] rhs_ = rhs[self.kernel.non_con_map] out[self.kernel.non_con_map] = spsolve(K, rhs_) residual = cp.linalg.norm(rhs - self.kernel@out)/cp.linalg.norm(rhs) return out, residual
[docs] class MultiGrid(Solver): """ CUDA-accelerated Geometric Multigrid solver for structured meshes. GPU implementation of geometric multigrid with Jacobi smoothing and coarse grid correction. Most memory-efficient solver for large 3D problems on GPU. Uses restriction/prolongation operators on nested grids for O(n) complexity per iteration. Parameters ---------- mesh : StructuredMesh2D or StructuredMesh3D CUDA structured mesh (required for geometric coarsening) kernel : CuStiffnessKernel CUDA stiffness kernel (must be StructuredStiffnessKernel) maxiter : int, optional Maximum outer iterations (preconditioned CG) (default: 1000) tol : float, optional Convergence tolerance for relative residual (default: 1e-5) n_smooth : int, optional Number of Jacobi smoothing iterations per level (default: 3) omega : float, optional Jacobi relaxation parameter (default: 0.5) n_level : int, optional Number of multigrid levels (default: 3) cycle : str, optional Multigrid cycle type: 'V' or 'W' (default: 'W') w_level : int or list, optional Levels to apply W-cycle at (default: 1) coarse_solver : str, optional Coarse grid solver: 'cholmod', 'cg', 'gmres', 'splu', 'spsolve' (default: 'cholmod') matrix_free : bool, optional Whether to use matrix-free operators (default: False, not yet implemented) low_level_tol : float, optional Tolerance for coarse grid iterative solver (default: 1e-8) low_level_maxiter : int, optional Max iterations for coarse grid iterative solver (default: 5000) min_omega : float, optional Minimum omega for adaptive relaxation (default: 0.4) omega_boost : float, optional Omega boost factor per level (default: 1.06) Notes ----- - Only works with structured meshes (uniform grid coarsening) - Memory efficient: No explicit matrix storage - Preconditioned CG with multigrid preconditioner - Adaptive omega based on residual - CHOLMOD coarse solver requires CPU transfer (fast for small coarse grids) Raises ------ ValueError If matrix_free=True with splu/spsolve coarse solver If coarse_solver not in ['cg', 'gmres', 'splu', 'spsolve', 'cholmod'] Examples -------- >>> from pyFANTOM.CUDA import StructuredMesh3D, StructuredStiffnessKernel, MultiGrid >>> mesh = StructuredMesh3D(nx=128, ny=128, nz=128, lx=1.0, ly=1.0, lz=1.0) >>> kernel = StructuredStiffnessKernel(mesh=mesh) >>> solver = MultiGrid(mesh=mesh, kernel=kernel, n_level=4, cycle='W') """
[docs] def __init__(self, mesh: Union[StructuredMesh2D,StructuredMesh3D], kernel: CuStiffnessKernel, maxiter=1000, tol=1e-5, n_smooth=3, omega=0.5 , n_level = 3, cycle='W', w_level=1, coarse_solver='cholmod', matrix_free=False, low_level_tol = 1e-8, low_level_maxiter=5000, min_omega=0.4, omega_boost=1.06): super().__init__() self.kernel = kernel self.mesh = mesh self.last_x0 = None self.tol = tol self.maxiter = maxiter self.n_smooth = n_smooth self.omega = omega self.max_omega = omega self.min_omega = max(omega/2, min_omega) self.d_omega = (self.max_omega - self.min_omega) self.omega_boost = omega_boost self.n_level = n_level self.cycle = cycle self.w_level = w_level self.matrix_free = matrix_free self.dof = self.mesh.dof self.coarse_solver = coarse_solver self.low_level_tol = low_level_tol self.low_level_maxiter = low_level_maxiter self.factor = None if isinstance(self.w_level, int): self.w_level = [self.w_level] if self.coarse_solver in ['splu', 'spsolve'] and self.matrix_free: raise ValueError("Matrix free is not supported with splu solver use cg instead.") if self.coarse_solver not in ['cg', 'gmres', 'splu', 'spsolve', 'cholmod']: raise ValueError("Coarse solver not recognized.") if not self.matrix_free: self.ptr = None self.PRs = [] else: raise NotImplementedError("Matrix free is not implemented yet.")
[docs] def reset(self): """Reset cached multigrid hierarchy (forces rebuild on next solve).""" self.ptr = None
def _jacobi_smoother(self, x, b, A, D_inv, n_step): for _ in range(n_step): x += self.omega * D_inv * (b - A @ x) return x def _setup(self): self.levels = [] D = 1/self.kernel.diagonal() self.levels.append((self.kernel, D)) for i in range(self.n_level): if i == 0 and not self.ptr is None: op = get_restricted_l0_cuda(self.mesh, self.kernel, Cp = self.ptr[0]) D = 1/op.diagonal() self.levels.append((op, D)) elif i == 0: self.ptr = [] if self.mesh.nel.shape[0] == 2: nnz = cp.ones((self.mesh.nel[0]//2+1)*(self.mesh.nel[1]//2+1)*self.dof, dtype=cp.int32) * 18 Cp = cp.zeros((self.mesh.nel[0]//2+1)*(self.mesh.nel[1]//2+1)*self.dof + 1, dtype=cp.int32) Cp[1:] = cp.cumsum(nnz) else: nnz = cp.ones((self.mesh.nel[0]//2+1)*(self.mesh.nel[1]//2+1)*(self.mesh.nel[2]//2+1)*self.dof, dtype=cp.int32) * 81 Cp = cp.zeros((self.mesh.nel[0]//2+1)*(self.mesh.nel[1]//2+1)*(self.mesh.nel[2]//2+1)*self.dof + 1, dtype=cp.int32) Cp[1:] = cp.cumsum(nnz) self.ptr.append(cp.copy(Cp)) op = get_restricted_l0_cuda(self.mesh, self.kernel, Cp = Cp) D = 1/op.diagonal() self.levels.append((op, D)) elif len(self.ptr) <= i: if self.mesh.nel.shape[0] == 2: nnz = cp.ones((self.mesh.nel[0]//(2**(i+1))+1)*(self.mesh.nel[1]//(2**(i+1))+1)*self.dof, dtype=cp.int32) * 18 Cp = cp.zeros((self.mesh.nel[0]//(2**(i+1))+1)*(self.mesh.nel[1]//(2**(i+1))+1)*self.dof + 1, dtype=cp.int32) Cp[1:] = cp.cumsum(nnz) else: nnz = cp.ones((self.mesh.nel[0]//(2**(i+1))+1)*(self.mesh.nel[1]//(2**(i+1))+1)*(self.mesh.nel[2]//(2**(i+1))+1)*self.dof, dtype=cp.int32) * 81 Cp = cp.zeros((self.mesh.nel[0]//(2**(i+1))+1)*(self.mesh.nel[1]//(2**(i+1))+1)*(self.mesh.nel[2]//(2**(i+1))+1)*self.dof + 1, dtype=cp.int32) Cp[1:] = cp.cumsum(nnz) self.ptr.append(cp.copy(Cp)) op = get_restricted_l1p_cuda(self.levels[-1][0], self.mesh.nel//(2**i), self.dof, Cp = Cp) D = 1/op.diagonal() # if i == self.n_level-1: # op.sum_duplicates() self.levels.append((op, D)) else: op = get_restricted_l1p_cuda(self.levels[-1][0], self.mesh.nel//(2**i), self.dof, Cp = self.ptr[i]) D = 1/op.diagonal() # if i == self.n_level-1: # op.sum_duplicates() self.levels.append((op, D)) cp.get_default_memory_pool().free_all_blocks() def _coarse_solver(self, K): if self.coarse_solver == 'cg': return lambda rhs: cg(K, rhs, tol = self.low_level_tol, maxiter=self.low_level_maxiter)[0] elif self.coarse_solver == 'gmres': lambda rhs: gmres(K, rhs, tol = self.low_level_tol, maxiter=self.low_level_maxiter)[0] elif self.coarse_solver == 'splu': SOLVER = splu(K) return lambda rhs: SOLVER.solve(rhs) elif self.coarse_solver == 'spsolve': return lambda rhs: spsolve(K, rhs) elif self.coarse_solver == 'cholmod': K = K.tocsc().get() if self.factor is None: self.factor = cholesky(K, beta=1e-6) else: self.factor.cholesky_inplace(K, beta=1e-6) return lambda rhs: cp.array(self.factor(rhs.get())) else: raise ValueError("Coarse solver not recognized.") def _multi_grid(self, x, b, level): if level == self.n_level: return self.coarse_solve(b) # presmooth A, D = self.levels[level] self.omega = self.omega * (1.06)**(level) x = self._jacobi_smoother(x, b, A, D, self.n_smooth) self.omega = self.omega / (1.06)**(level) # residual r = b - A@x nel = (self.mesh.nel // 2**level).astype(cp.int32) # restrict coarse_residual = apply_restriction_cuda(r,nel,self.dof) # coarse_residual = self.PRs[level][1] @ r # go to next level coarse_u = coarse_residual*0.0 e = self._multi_grid(coarse_u, coarse_residual, level+1) # prolongate e = apply_prolongation_cuda(e,nel,self.dof) # e = self.PRs[level][0] @ e self.omega = self.omega * (1.06)**(level) e = self._jacobi_smoother(e, r, A, D, self.n_smooth) self.omega = self.omega / (1.06)**(level) x += e if self.cycle == 'w' and level in self.w_level: r = b - A@x # restrict coarse_residual = apply_restriction_cuda(r,nel,self.dof) # coarse_residual = self.PRs[level][1] @ r # go to next level coarse_u = coarse_residual*0.0 e = self._multi_grid(coarse_u, coarse_residual, level+1) # prolongate e = apply_prolongation_cuda(e,nel,self.dof) # e = self.PRs[level][0] @ e self.omega = self.omega * (1.06)**(level) e = self._jacobi_smoother(e, r, A, D, self.n_smooth) self.omega = self.omega / (1.06)**(level) x += e return x
[docs] def solve(self, rhs, rho=None, use_last=True): """ Solve linear system K(rho) @ U = rhs on GPU using multigrid-preconditioned CG. Parameters ---------- rhs : cp.ndarray Right-hand side force vector on GPU, shape (n_dof,) rho : cp.ndarray, optional Design variables (densities) on GPU, shape (n_elements,) use_last : bool, optional Use previous solution as initial guess (warm-starting), default: True Returns ------- U : cp.ndarray Solution vector on GPU, shape (n_dof,) residual : float Relative residual: ||K@U - rhs|| / ||rhs|| Raises ------ ValueError If rho is not provided and kernel.has_rho is False Notes ----- - All operations performed on GPU using CuPy - First call builds multigrid hierarchy (expensive, ~1-5 seconds) - Subsequent calls reuse hierarchy (fast, ~0.1-1 second) - Uses preconditioned CG with multigrid V/W-cycle preconditioner - Adaptive omega adjusts relaxation based on residual reduction - Frees GPU memory after solve to prevent accumulation - Most memory-efficient solver for large 3D problems (>10M DOF) """ if not (self.kernel.has_rho or (not rho is None)): raise ValueError("Solver requires a density vector to be passed or set on the kernel.") if not rho is None: rho = self.kernel.set_rho(rho) if use_last and self.last_x0 is not None: x = self.last_x0 else: x = cp.zeros_like(rhs) if self.matrix_free: self._mat_free_setup() else: self._setup() v_cycle = self._mat_free_multi_grid if self.matrix_free else self._multi_grid self.coarse_solve = self._coarse_solver(self.levels[-1][0]) self.omega = self.min_omega r = rhs - self.kernel.dot(x) z = v_cycle(cp.zeros_like(r), r, 0) p = z.copy() rho_old = cp.dot(r, z) norm_b = cp.linalg.norm(rhs) for i in range(self.maxiter): q = self.kernel.dot(p) alpha = rho_old / cp.dot(p, q) x += alpha * p r -= alpha * q norm_r = cp.linalg.norm(r) R = norm_r / norm_b if R < self.tol: break self.omega = self.min_omega + self.d_omega/2*cp.exp((-cp.clip(R,self.tol,1e-1)+self.tol)*500) z = v_cycle(cp.zeros_like(r), r,0) rho_new = cp.dot(r, z) beta = rho_new / rho_old p = z + beta * p rho_old = rho_new residual = norm_r / norm_b self.last_x0 = x del self.levels, self.coarse_solve cp._default_memory_pool.free_all_blocks() return x, residual