"""
The main purpose of this class is to represent all relevant
information for a static analysis. This will include
information defining the loading condition as well as various
other pieces of information.
.. note:: This class should be created using the
:meth:`pyTACS.createStaticProblem <tacs.pytacs.pyTACS.createStaticProblem>` method.
"""
# =============================================================================
# Imports
# =============================================================================
import copy
import os
import time
from collections import OrderedDict
import warnings
import numpy as np
import pyNastran.bdf as pn
import tacs.TACS
import tacs.elements
import tacs.solvers
from tacs.problems.base import TACSProblem
[docs]
class StaticProblem(TACSProblem):
# Default options for class
defaultOptions = {
"outputDir": [str, "./", "Output directory for F5 file writer."],
# Solution Options
"resetBeforeSolve": [
bool,
False,
"Reset the states before every solve, this can be useful for avoiding issues when testing derivatives against finite-difference/complex-step",
],
"linearSolver": [
str,
"GMRES",
"Krylov subspace method to use for linear solver. Currently only supports 'GMRES'",
],
"nonlinearSolver": [
str,
"Continuation",
"Convergence method to use for nonlinear solver. Currently only supports 'Continuation'",
],
"orderingType": [
int,
tacs.TACS.ND_ORDER,
"Ordering type to use for matrix partitioning.\n"
"\t Acceptable values are:\n"
f"\t\t tacs.TACS.NATURAL_ORDER = {tacs.TACS.NATURAL_ORDER}\n"
f"\t\t tacs.TACS.RCM_ORDER = {tacs.TACS.RCM_ORDER}\n"
f"\t\t tacs.TACS.ND_ORDER = {tacs.TACS.ND_ORDER}\n"
f"\t\t tacs.TACS.TACS_AMD_ORDER = {tacs.TACS.TACS_AMD_ORDER}\n"
f"\t\t tacs.TACS.MULTICOLOR_ORDER = {tacs.TACS.MULTICOLOR_ORDER}",
],
"PCFillLevel": [int, 1000, "Preconditioner fill level."],
"PCFillRatio": [float, 20.0, "Preconditioner fill ratio."],
"subSpaceSize": [int, 10, "Subspace size for Krylov solver."],
"nRestarts": [int, 15, "Max number of restarts for Krylov solver."],
"flexible": [
bool,
True,
"Flag for whether the preconditioner is flexible.",
],
"L2Convergence": [
float,
1e-12,
"Absolute convergence tolerance for linear solver based on l2 norm of residual.",
],
"L2ConvergenceRel": [
float,
1e-12,
"Relative convergence tolerance for linear solver based on l2 norm of residual.",
],
"RBEStiffnessScaleFactor": [
float,
1e3,
"Constraint matrix scaling factor used in RBE Lagrange multiplier stiffness matrix.",
],
"RBEArtificialStiffness": [
float,
1e-3,
"Artificial constant added to diagonals of RBE Lagrange multiplier stiffness matrix \n"
"\t to stabilize preconditioner.",
],
"useMonitor": [
bool,
False,
"Flag for whether to attach a debug monitor to the linear solver.",
],
"monitorFrequency": [
int,
10,
"Print frequency for sub iterations of linear solver.",
],
# Output Options
"writeSolution": [
bool,
True,
"Flag for suppressing all f5 file writing.",
],
"writeNLIterSolutions": [
bool,
False,
"Flag to save a solution file at every nonlinear solver iterations.",
],
"numberSolutions": [
bool,
True,
"Flag for attaching solution counter index to f5 files.",
],
"printTiming": [
bool,
False,
"Flag for printing out timing information for class procedures.",
],
"printLevel": [
int,
0,
"Print level for nonlinear solver.\n"
"\t Accepts:\n"
"\t\t 0 : No printing.\n"
"\t\t 1 : Print nonlinear iterations.\n",
],
}
def __init__(
self,
name,
assembler,
comm,
outputViewer=None,
meshLoader=None,
isNonlinear=False,
options=None,
):
"""
NOTE: This class should not be initialized directly by the user.
Use pyTACS.createStaticProblem instead.
Parameters
----------
name : str
Name of this tacs problem
assembler : tacs.TACS.Assembler
Cython object responsible for creating and setting tacs objects used to solve problem
comm : mpi4py.MPI.Intracomm
The comm object on which to create the pyTACS object.
outputViewer : tacs.TACS.TACSToFH5
Cython object used to write out f5 files that can be converted and used for postprocessing.
meshLoader : tacs.pymeshloader.pyMeshLoader
pyMeshLoader object used to create the assembler.
options : dict
Dictionary holding problem-specific option parameters (case-insensitive).
"""
# Problem name
self.name = name
# Set solvers to None, until we set them up later
self.linearSolver = None
self.nonlinearSolver = None
# Default setup for common problem class objects, sets up comm and options
TACSProblem.__init__(
self, assembler, comm, options, outputViewer, meshLoader, isNonlinear
)
# Create problem-specific variables
self._createVariables()
# Setup solver and solver history objects for nonlinear problems
if self.isNonlinear:
# TODO: I'd like to have an option to specify other NL solvers in future
if self.getOption("nonlinearSolver") == "Continuation":
# Give the nonlinear solvers their own linear solvers
newtonLinearSolver = tacs.TACS.KSM(
self.K,
self.PC,
self.getOption("subSpaceSize"),
self.getOption("nRestarts"),
self.getOption("flexible"),
)
newtonLinearSolver.setTolerances(
self.getOption("L2ConvergenceRel"), self.getOption("L2Convergence")
)
continuationLinearSolver = tacs.TACS.KSM(
self.K,
self.PC,
self.getOption("subSpaceSize"),
self.getOption("nRestarts"),
self.getOption("flexible"),
)
continuationLinearSolver.setTolerances(
self.getOption("L2ConvergenceRel"), self.getOption("L2Convergence")
)
# Create Newton solver, the inner solver for the continuation solver
newtonSolver = tacs.solvers.NewtonSolver(
assembler=self.assembler,
setStateFunc=self.setVariables,
resFunc=self.getResidual,
jacFunc=self.updateJacobian,
pcUpdateFunc=self.updatePreconditioner,
linearSolver=newtonLinearSolver,
comm=self.comm,
)
# And now create the continuation solver
self.nonlinearSolver = tacs.solvers.ContinuationSolver(
jacFunc=self.updateJacobian,
pcUpdateFunc=self.updatePreconditioner,
linearSolver=continuationLinearSolver,
setLambdaFunc=self.setLoadScale,
getLambdaFunc=self.getLoadScale,
innerSolver=newtonSolver,
comm=self.comm,
)
self.nonlinearSolver.setCallback(self._nonlinearCallback)
else:
raise self._TACSError(
"Unknown nonlinearSolver option. Valid options are "
"'Continuation'"
)
def _createVariables(self):
"""Internal to create the variable required by TACS"""
opt = self.getOption
# Generic residual vector
self.res = self.assembler.createVec()
self.rhs = self.assembler.createVec()
# Dictionaries to hold adjoint/sens vectors for each evalFunc
self.adjointList = OrderedDict()
self.dIduList = OrderedDict()
self.dvSensList = OrderedDict()
self.xptSensList = OrderedDict()
# Temporary vector for adjoint solve
self.phi = self.assembler.createVec()
self.adjRHS = self.assembler.createVec()
# Load vector
self.F = self.assembler.createVec()
self.F_array = self.F.getArray()
# State variable vector
self.u = self.assembler.createVec()
self.u_array = self.u.getArray()
# Vectors used to decompose residual into external and internal forces
self.externalForce = self.assembler.createVec()
self.internalForce = self.assembler.createVec()
# Auxiliary element object for applying tractions/pressure
self.auxElems = tacs.TACS.AuxElements()
# Counter for number of calls to `solve` method
self.callCounter = -1
# Norms
self.initNorm = 0.0
self.startNorm = 0.0
self.finalNorm = 0.0
# Load scaling factor
self._loadScale = 1.0
# Tangent Stiffness --- process the ordering option here:
ordering = opt("orderingType")
# True stiffness matrix
self.K = self.assembler.createSchurMat(ordering)
# Artificial stiffness for RBE numerical stabilization to stabilize PC
self.rbeArtificialStiffness = self.assembler.createSchurMat(ordering)
# Additional Vecs for updates
self.update = self.assembler.createVec()
# Setup PCScMat and KSM solver
self.alpha = 1.0
self.beta = 0.0
self.gamma = 0.0
# Computes stiffness matrix w/o art. terms
# Set artificial stiffness factors in rbe class to zero
tacs.elements.RBE2.setScalingParameters(opt("RBEStiffnessScaleFactor"), 0.0)
tacs.elements.RBE3.setScalingParameters(opt("RBEStiffnessScaleFactor"), 0.0)
self.assembler.assembleJacobian(
self.alpha,
self.beta,
self.gamma,
self.res,
self.K,
loadScale=self.loadScale,
)
# Now isolate art. terms
# Recompute stiffness with artificial terms included
tacs.elements.RBE2.setScalingParameters(
opt("RBEStiffnessScaleFactor"), opt("RBEArtificialStiffness")
)
tacs.elements.RBE3.setScalingParameters(
opt("RBEStiffnessScaleFactor"), opt("RBEArtificialStiffness")
)
self.assembler.assembleJacobian(
self.alpha, self.beta, self.gamma, None, self.rbeArtificialStiffness
)
# Subtract full stiffness w/o artificial terms from full stiffness w/ terms
# to isolate artificial stiffness terms
self.rbeArtificialStiffness.axpy(-1.0, self.K)
reorderSchur = 1
self.PC = tacs.TACS.Pc(
self.K,
lev_fill=opt("PCFillLevel"),
ratio_fill=opt("PCFillRatio"),
reorder=reorderSchur,
)
# Operator, fill level, fill ratio, msub, rtol, ataol
if opt("linearSolver").upper() == "GMRES":
self.linearSolver = tacs.TACS.KSM(
self.K,
self.PC,
opt("subSpaceSize"),
opt("nRestarts"),
opt("flexible"),
)
# TODO: Fix this
# elif opt('linearSolver').upper() == 'GCROT':
# self.KSM = tacs.TACS.GCROT(
# self.K, self.PC, opt('subSpaceSize'), opt('subSpaceSize'),
# opt('nRestarts'), opt('flexible'))
else:
raise self._TACSError(
"Unknown linearSolver option. Valid options are " "'GMRES' or 'GCROT'"
)
self.linearSolver.setTolerances(
self.getOption("L2ConvergenceRel"), self.getOption("L2Convergence")
)
if opt("useMonitor"):
self.linearSolver.setMonitor(
self.comm,
_descript=opt("linearSolver").upper(),
freq=opt("monitorFrequency"),
)
# Pass new matrix and preconditioner to nonlinear solver linear solvers
if self.nonlinearSolver is not None:
self.nonlinearSolver.linearSolver.setOperators(self.K, self.PC)
try:
self.nonlinearSolver.innerSolver.linearSolver.setOperators(
self.K, self.PC
)
except AttributeError:
pass
# Linear solver factor flag
self._jacobianUpdateRequired = True
self._preconditionerUpdateRequired = True
[docs]
def setOption(self, name, value):
"""
Set a solver option value. The name is not case sensitive.
Parameters
----------
name : str
Name of option to modify
value : depends on option
New option value to set
"""
# Updated deprecated option
if name.lower() == "ksmsolver":
name = "linearSolver"
warnings.warn(
"'KSMSolver' option will be deprecated starting in tacs 3.7.0. "
"Please use `linearSolver` option instead.",
DeprecationWarning,
)
# Default setOption for common problem class objects
TACSProblem.setOption(self, name, value)
createVariables = True
if self.linearSolver is not None:
# Update tolerances
if "l2convergence" in name.lower():
createVariables = False
self.linearSolver.setTolerances(
self.getOption("L2ConvergenceRel"),
self.getOption("L2Convergence"),
)
# No need to reset solver for output options
if name.lower() in [
"writesolution",
"writenlitersolutions",
"printtiming",
"numbersolutions",
"outputdir",
"printLevel",
]:
createVariables = False
# Reset solver for all other option changes
if createVariables:
self._createVariables()
@property
def loadScale(self):
"""This is a scaling factor applied to all forcing terms
Forcing terms includes both the user supplied force vector and the forcing terms coming from aux elements in
the TACS assembler (e.g inertial, centrifugal forces)
Returns
-------
float or complex
The current load scale
"""
return self._loadScale
@loadScale.setter
def loadScale(self, value):
"""Set the scaling applied to external loads
This function exists so that calling `problem.loadScale = value` has the same effect as calling::
`problem.setLoadScale(value)`
This is important in case we want to update other things when the load scale is changed in future
Parameters
----------
value : float or complex
Value to set the load scale to
"""
self.setLoadScale(value)
[docs]
def setLoadScale(self, value):
"""Set the scaling applied to external loads
Parameters
----------
value : float or complex
Value to set the load scale to
"""
if value != self._loadScale:
self._jacobianUpdateRequired = True
self._loadScale = value
[docs]
def getLoadScale(self):
"""Get the current load scale
This function is required by the continuation solver
Returns
-------
float
Current load scale
"""
return self.loadScale
[docs]
def addFunction(self, funcName, funcHandle, compIDs=None, **kwargs):
"""
Generic method to add a function for TACS. It is intended to
be reasonably generic since the user supplies the actual
function handle to use. See the :py:mod:`~tacs.functions` module
for supported TACS eval functions.
Parameters
----------
funcName : str
The user-supplied name for the function. This will
typically be a string that is meaningful to the user
funcHandle : tacs.TACS.Function
The function handle to use for creation. This must come
from the functions module in tacs.
compIDs: list
List of compIDs to select.
**kwargs:
Any keyword arguments to be passed to the TACS function during setup.
"""
success = TACSProblem.addFunction(self, funcName, funcHandle, compIDs, **kwargs)
if success:
# Create additional tacs BVecs to hold adjoint and sens info
self.adjointList[funcName] = self.assembler.createVec()
self.dIduList[funcName] = self.assembler.createVec()
self.dvSensList[funcName] = self.assembler.createDesignVec()
self.xptSensList[funcName] = self.assembler.createNodeVec()
return success
[docs]
def setDesignVars(self, x):
"""
Update the design variables used by tacs.
Parameters
----------
x : numpy.ndarray or dict or tacs.TACS.Vec
The variables (typically from the optimizer) to set. It
looks for variable in the ``self.varName`` attribute.
"""
TACSProblem.setDesignVars(self, x)
self._jacobianUpdateRequired = True
[docs]
def setNodes(self, Xpts):
"""
Set the mesh coordinates of the structure.
Parameters
----------
Xpts : numpy.ndarray
Structural coordinate in array of size (N * 3) where N is
the number of structural nodes on this processor.
"""
TACSProblem.setNodes(self, Xpts)
self._jacobianUpdateRequired = True
####### Load adding methods ########
[docs]
def addLoadToComponents(self, compIDs, F, averageLoad=False):
"""
This method is used to add a *FIXED TOTAL LOAD* on one or more
components, defined by COMPIDs. The purpose of this routine is to add loads that
remain fixed throughout an optimization. An example would be an engine load.
This routine determines all the unique nodes in the FE model that are part of the
requested components, then takes the total 'force' by F and divides by the
number of nodes. This average load is then applied to the nodes.
Parameters
----------
compIDs : list[int] or int
The components with added loads. Use pyTACS selectCompIDs method
to determine this.
F : numpy.ndarray 1d or 2d length (varsPerNodes) or (numCompIDs, varsPerNodes)
Vector(s) of 'force' to apply to each component. If only one force vector is provided,
force will be copied uniformly across all components.
averageLoad : bool
Flag to determine whether load should be split evenly across all components (True)
or copied and applied individually to each component (False). Defaults to False.
Notes
-----
The units of the entries of the 'force' vector F are not
necessarily physical forces and their interpretation depends
on the physics problem being solved and the dofs included
in the model.
A couple of examples of force vector components for common problems are listed below:
In Heat Conduction with varsPerNode = 1
F = [Qdot] # heat rate
In Elasticity with varsPerNode = 3,
F = [fx, fy, fz] # forces
In Elasticity with varsPerNode = 6,
F = [fx, fy, fz, mx, my, mz] # forces + moments
In Thermoelasticity with varsPerNode = 4,
F = [fx, fy, fz, Qdot] # forces + heat rate
In Thermoelasticity with varsPerNode = 7,
F = [fx, fy, fz, mx, my, mz, Qdot] # forces + moments + heat rate
"""
self._addLoadToComponents(self.F, compIDs, F, averageLoad)
[docs]
def addLoadToNodes(self, nodeIDs, F, nastranOrdering=False):
"""
This method is used to add a fixed point load of F to the
selected node IDs.
Parameters
----------
nodeIDs : list[int]
The nodes IDs with added loads.
F : Numpy 1d or 2d array length (varsPerNodes) or (numNodeIDs, varsPerNodes)
Array of force vectors, one for each node. If only one force vector is provided,
force will be copied uniformly across all nodes.
nastranOrdering : bool
Flag signaling whether nodeIDs are in TACS (default)
or NASTRAN (grid IDs in bdf file) ordering
Notes
-----
The units of the entries of the 'force' vector F are not
necessarily physical forces and their interpretation depends
on the physics problem being solved and the dofs included
in the model.
A couple of examples of force vector components for common problems are listed below:
In Heat Conduction with varsPerNode = 1
F = [Qdot] # heat rate
In Elasticity with varsPerNode = 3,
F = [fx, fy, fz] # forces
In Elasticity with varsPerNode = 6,
F = [fx, fy, fz, mx, my, mz] # forces + moments
In Thermoelasticity with varsPerNode = 4,
F = [fx, fy, fz, Qdot] # forces + heat rate
In Thermoelasticity with varsPerNode = 7,
F = [fx, fy, fz, mx, my, mz, Qdot] # forces + moments + heat rate
"""
self._addLoadToNodes(self.F, nodeIDs, F, nastranOrdering)
[docs]
def addLoadToRHS(self, Fapplied):
"""
This method is used to add a *FIXED TOTAL LOAD* directly to the
right hand side vector given the equation below:
K*u = f
Where:
- K : Stiffness matrix for problem
- u : State variables for problem
- f : Right-hand side vector to add loads to
Parameters
----------
Fapplied : numpy.ndarray or tacs.TACS.Vec
Distributed array containing loads to applied to RHS of the problem.
"""
self._addLoadToRHS(self.F, Fapplied)
[docs]
def addTractionToComponents(self, compIDs, tractions, faceIndex=0):
"""
This method is used to add a *FIXED TOTAL TRACTION* on one or more
components, defined by COMPIDs. The purpose of this routine is
to add loads that remain fixed throughout an optimization.
Parameters
----------
compIDs : list[int] or int
The components with added loads. Use pyTACS selectCompIDs method
to determine this.
tractions : Numpy array length 1 or compIDs
Array of traction vectors for each component
faceIndex : int
Indicates which face (side) of element to apply traction to.
Note: not required for certain elements (i.e. shells)
"""
self._addTractionToComponents(self.auxElems, compIDs, tractions, faceIndex)
[docs]
def addTractionToElements(
self, elemIDs, tractions, faceIndex=0, nastranOrdering=False
):
"""
This method is used to add a fixed traction to the
selected element IDs. Tractions can be specified on an
element by element basis (if tractions is a 2d array) or
set to a uniform value (if tractions is a 1d array)
Parameters
----------
elemIDs : list[int]
The global element ID numbers for which to apply the traction.
tractions : Numpy 1d or 2d array length varsPerNodes or (elemIDs, varsPerNodes)
Array of traction vectors for each element
faceIndex : int
Indicates which face (side) of element to apply traction to.
Note: not required for certain elements (i.e. shells)
nastranOrdering : bool
Flag signaling whether elemIDs are in TACS (default)
or NASTRAN ordering
"""
self._addTractionToElements(
self.auxElems, elemIDs, tractions, faceIndex, nastranOrdering
)
[docs]
def addPressureToComponents(self, compIDs, pressures, faceIndex=0):
"""
This method is used to add a *FIXED TOTAL PRESSURE* on one or more
components, defined by COMPIds. The purpose of this routine is
to add loads that remain fixed throughout an optimization. An example
would be a fuel load.
Parameters
----------
compIDs : list[int] or int
The components with added loads. Use pyTACS selectCompIDs method
to determine this.
pressures : Numpy array length 1 or compIDs
Array of pressure values for each component
faceIndex : int
Indicates which face (side) of element to apply pressure to.
Note: not required for certain elements (i.e. shells)
"""
self._addPressureToComponents(self.auxElems, compIDs, pressures, faceIndex)
[docs]
def addPressureToElements(
self, elemIDs, pressures, faceIndex=0, nastranOrdering=False
):
"""
This method is used to add a fixed presure to the
selected element IDs. Pressures can be specified on an
element by element basis (if pressures is an array) or
set to a uniform value (if pressures is a scalar)
Parameters
----------
elemIDs : list[int]
The global element ID numbers for which to apply the pressure.
pressures : Numpy array length 1 or elemIDs
Array of pressure values for each element
faceIndex : int
Indicates which face (side) of element to apply pressure to.
Note: not required for certain elements (i.e. shells)
nastranOrdering : bool
Flag signaling whether elemIDs are in TACS (default)
or NASTRAN ordering
"""
self._addPressureToElements(
self.auxElems, elemIDs, pressures, faceIndex, nastranOrdering
)
[docs]
def addInertialLoad(self, inertiaVector):
"""
This method is used to add a fixed inertial load due to
a uniform acceleration over the entire model.
This is most commonly used to model gravity loads on a model.
Parameters
----------
inertiaVector : numpy.ndarray
Acceleration vector used to define inertial load.
"""
self._addInertialLoad(self.auxElems, inertiaVector)
[docs]
def addCentrifugalLoad(self, omegaVector, rotCenter, firstOrder=False):
"""
This method is used to add a fixed centrifugal load due to a
uniform rotational velocity over the entire model.
This is most commonly used to model rotors, rolling aircraft, etc.
Parameters
----------
omegaVector : numpy.ndarray
Rotational velocity vector (rad/s) used to define centrifugal load.
rotCenter : numpy.ndarray
Location of center of rotation used to define centrifugal load.
firstOrder : bool, optional
Whether to use first order approximation for centrifugal load,
which computes the force in the displaced position. By default False
"""
self._addCentrifugalLoad(self.auxElems, omegaVector, rotCenter, firstOrder)
[docs]
def addLoadFromBDF(self, loadID, scale=1.0):
"""
This method is used to add a fixed load set defined in the BDF file to the problem.
Currently, only supports LOAD, FORCE, MOMENT, GRAV, RFORCE, PLOAD2, and PLOAD4.
Parameters
----------
loadID : int
Load identification number of load set in BDF file user wishes to add to problem.
scale : float
Factor to scale the BDF loads by before adding to problem.
"""
self._addLoadFromBDF(self.F, self.auxElems, loadID, scale)
####### Static solver methods ########
def _updateAssemblerVars(self):
"""
Make sure that the assembler is using
the input variables associated with this problem
"""
self.assembler.setDesignVars(self.x)
self.assembler.setNodes(self.Xpts)
self.assembler.setAuxElements(self.auxElems)
# Set state variables
self.assembler.setVariables(self.u)
# Zero any time derivative terms
self.assembler.zeroDotVariables()
self.assembler.zeroDDotVariables()
# Set artificial stiffness factors in rbe class to zero
tacs.elements.RBE2.setScalingParameters(
self.getOption("RBEStiffnessScaleFactor"), 0.0
)
tacs.elements.RBE3.setScalingParameters(
self.getOption("RBEStiffnessScaleFactor"), 0.0
)
def _initializeSolve(self):
"""
Initialize the solution of the structural system for the
loadCase. The stiffness matrix is assembled and factored.
"""
self.updateJacobian()
self.updatePreconditioner()
[docs]
def solve(self, Fext=None):
"""
Solution of the static problem for current load set. The
forces must already be set.
Parameters
----------
Fext : numpy.ndarray or tacs.TACS.Vec, optional
Distributed array containing additional loads (ex. aerodynamic forces for aerostructural coupling)
to applied to RHS of the static problem, by default None
"""
startTime = time.time()
self.callCounter += 1
# Set problem vars to assembler
if self.getOption("resetBeforeSolve"):
self.zeroVariables()
self._updateAssemblerVars()
# Check if we need to initialize
self._initializeSolve()
initSolveTime = time.time()
if self.isNonlinear:
hasConverged = self._solveNonlinear(Fext)
else:
hasConverged = self._solveLinear(Fext)
solveTime = time.time()
# Get updated residual
self.getResidual(self.res, Fext)
self.finalNorm = np.real(self.res.norm())
finalNormTime = time.time()
# If timing was was requested print it, if the solution is nonlinear
# print this information automatically if prinititerations was requested.
if self.getOption("printTiming"):
self._pp("+--------------------------------------------------+")
self._pp("|")
self._pp("| TACS Solve Times:")
self._pp("|")
self._pp(
"| %-30s: %10.3f sec"
% ("TACS Solve Init Time", initSolveTime - startTime)
)
self._pp(
"| %-30s: %10.3f sec" % ("TACS Solve Time", solveTime - initSolveTime)
)
self._pp("|")
self._pp(
"| %-30s: %10.3f sec"
% ("TACS Total Solution Time", finalNormTime - startTime)
)
self._pp("+--------------------------------------------------+")
return hasConverged
def _solveLinear(self, Fext=None):
"""Solve a linear StaticProblem for a given external force vector
Parameters
----------
Fext : numpy.ndarray or tacs.TACS.Vec, optional
Distributed array containing additional loads (ex. aerodynamic forces for aerostructural coupling)
to applied to RHS of the static problem, by default None
Returns
-------
bool
Flag indicating whether the solver converged
"""
# Get current residual
self.getResidual(self.res, Fext=Fext)
# Get rhs vector
self.K.mult(self.u, self.rhs)
self.rhs.axpy(-1.0, self.res)
# Set initnorm as the norm of rhs
self.initNorm = np.real(self.rhs.norm())
# Starting Norm for this computation
self.startNorm = np.real(self.res.norm())
# Solve Linear System for the update
hasConverged = self.linearSolver.solve(self.res, self.update)
# Convert from int to bool
hasConverged = bool(hasConverged)
if not hasConverged:
self._TACSWarning(
"Linear solver failed to converge. "
"This is likely a sign that the problem is ill-conditioned. "
"Check that the model is properly restrained."
)
self.update.scale(-1.0)
# Update State Variables
self.assembler.getVariables(self.u)
self.u.axpy(1.0, self.update)
self.assembler.setVariables(self.u)
return hasConverged
def _solveNonlinear(self, Fext=None):
"""Solve a nonlinear StaticProblem for a given external force vector
Parameters
----------
Fext : numpy.ndarray or tacs.TACS.Vec, optional
Distributed array containing additional loads (ex. aerodynamic forces for aerostructural coupling)
to applied to RHS of the static problem, by default None
Returns
-------
bool
Flag indicating whether the solver converged
"""
# Compute the internal and external force components of the residual at the current point
self.getForces(
externalForceVec=self.externalForce,
internalForceVec=self.internalForce,
Fext=Fext,
)
self.initNorm = np.real(self.externalForce.norm())
# We need to update the residual function handle used by the nonlinear solver based on the current external force vector
def resFunc(res):
self.getResidual(res, Fext=Fext)
self.nonlinearSolver.resFunc = resFunc
self.nonlinearSolver.setRefNorm(self.initNorm)
self.nonlinearSolver.solve(u0=self.u, result=self.u)
# Since the state has changed, we need to flag that the jacobian and preconditioner should be before the next primal or adjoint solve
self._preconditionerUpdateRequired = True
self._jacobianUpdateRequired = True
# Finally return a bool indicating whether the solve was successful
return self.nonlinearSolver.hasConverged
def _nonlinearCallback(self, solver, u, res, monitorVars):
"""Callback function to be called by the nonlinear solver at each iteration
Parameters
----------
solver : pyTACS solver object
The solver
u : tacs.TACS.Vec
Current state vector
res : tacs.TACS.Vec
Current residual vector
monitorVars : dict
Dictionary of variables to monitor, the values the solver should include can be
specified through the ``"nonlinearSolverMonitorVars"`` option.
"""
iteration = 0
if self.rank == 0:
# Figure out the iteration number
iteration = self.nonlinearSolver.history.getIter() - 1
if self.getOption("printLevel") > 0:
if iteration % 50 == 0:
self.nonlinearSolver.history.printHeader()
self.nonlinearSolver.history.printData()
self.comm.bcast(iteration, root=0)
if self.getOption("writeNLIterSolutions"):
self.writeSolution(
baseName=f"{self.name}-{self.callCounter:03d}-NLIter", number=iteration
)
[docs]
def updateJacobian(self, res=None):
"""Update the Jacobian (a.k.a stiffness) matrix
The Jacobian will only actually be updated if the
``_jacobianUpdateRequired`` flag is set to True.
Parameters
----------
res : tacs.TACS.Vec, optional
If provided, the residual is also computed and stored in this vector
"""
if self._jacobianUpdateRequired:
# Assemble residual and stiffness matrix (w/o artificial terms)
self.assembler.assembleJacobian(
self.alpha,
self.beta,
self.gamma,
res,
self.K,
loadScale=self._loadScale,
)
self._jacobianUpdateRequired = False
self._preconditionerUpdateRequired = True
[docs]
def updatePreconditioner(self):
"""Update the Jacobian (a.k.a stiffness) matrix preconditioner
By default, the static problem uses a full LU factorization of the
Jacobian matrix as a preconditioner, which means that this step is
typically the most expensive operation in the solution process.
The preconditioner will only actually be updated if the
``_preconditionerUpdateRequired`` flag is set to True. This occurs
whenever the Jacobian is updated.
"""
if self._preconditionerUpdateRequired:
# Stiffness matrix must include artificial terms before pc factor
# to prevent factorization issues w/ zero-diagonals
self.K.axpy(1.0, self.rbeArtificialStiffness)
self.PC.factor()
# Remove artificial stiffness terms to get true stiffness mat
self.K.axpy(-1.0, self.rbeArtificialStiffness)
self._preconditionerUpdateRequired = False
####### Function eval/sensitivity methods ########
[docs]
def evalFunctions(self, funcs, evalFuncs=None, ignoreMissing=False):
"""
This is the main routine for returning useful information from
pytacs. The functions corresponding to the strings in
evalFuncs are evaluated and updated into the provided
dictionary.
Parameters
----------
funcs : dict
Dictionary into which the functions are saved.
evalFuncs : iterable object containing strings.
If not none, use these functions to evaluate.
ignoreMissing : bool
Flag to supress checking for a valid function. Please use
this option with caution.
Examples
--------
>>> funcs = {}
>>> staticProblem.solve()
>>> staticProblem.evalFunctions(funcs, ['mass'])
>>> funcs
>>> # Result will look like (if StaticProblem has name of 'c1'):
>>> # {'cl_mass':12354.10}
"""
startTime = time.time()
# Set problem vars to assembler
self._updateAssemblerVars()
if evalFuncs is None:
evalFuncs = sorted(list(self.functionList))
else:
evalFuncs = sorted(list(evalFuncs))
if not ignoreMissing:
for f in evalFuncs:
if f not in self.functionList:
raise self._TACSError(
f"Supplied function '{f}' has not been added "
"using addFunction()."
)
setupProblemTime = time.time()
# Fast parallel function evaluation of structural funcs:
handles = [self.functionList[f] for f in evalFuncs if f in self.functionList]
funcVals = self.assembler.evalFunctions(handles)
functionEvalTime = time.time()
# Assign function values to appropriate dictionary
i = 0
for f in evalFuncs:
if f in self.functionList:
key = self.name + "_%s" % f
funcs[key] = funcVals[i]
i += 1
dictAssignTime = time.time()
if self.getOption("printTiming"):
self._pp("+--------------------------------------------------+")
self._pp("|")
self._pp("| TACS Function Times:")
self._pp("|")
self._pp(
"| %-30s: %10.3f sec"
% ("TACS Function Setup Time", setupProblemTime - startTime)
)
self._pp(
"| %-30s: %10.3f sec"
% (
"TACS Function Eval Time",
functionEvalTime - setupProblemTime,
)
)
self._pp(
"| %-30s: %10.3f sec"
% ("TACS Dict Time", dictAssignTime - functionEvalTime)
)
self._pp("|")
self._pp(
"| %-30s: %10.3f sec"
% ("TACS Function Time", dictAssignTime - startTime)
)
self._pp("+--------------------------------------------------+")
[docs]
def evalFunctionsSens(self, funcsSens, evalFuncs=None):
"""
This is the main routine for returning useful (sensitivity)
information from problem. The derivatives of the functions
corresponding to the strings in evalFuncs are evaluated and
updated into the provided dictionary. The derivitives with
respect to all design variables and node locations are computed.
Parameters
----------
funcsSens : dict
Dictionary into which the derivatives are saved.
evalFuncs : iterable object containing strings
The functions the user wants returned
Examples
--------
>>> funcsSens = {}
>>> staticProblem.evalFunctionsSens(funcsSens, ['mass'])
>>> funcsSens
>>> # Result will look like (if StaticProblem has name of 'c1'):
>>> # {'c1_mass':{'struct':[1.234, ..., 7.89], 'Xpts':[3.14, ..., 1.59]}}
"""
startTime = time.time()
# Set problem vars to assembler
self._updateAssemblerVars()
if evalFuncs is None:
evalFuncs = sorted(list(self.functionList))
else:
evalFuncs = sorted(list(evalFuncs))
# Check that the functions are all ok.
# and prepare tacs vecs for adjoint procedure
dvSenses = []
xptSenses = []
dIdus = []
adjoints = []
for f in evalFuncs:
if f not in self.functionList:
raise self._TACSError(
"Supplied function has not been added " "using addFunction()"
)
else:
# Populate the lists with the tacs bvecs
# we'll need for each adjoint/sens calculation
dvSens = self.dvSensList[f]
dvSens.zeroEntries()
dvSenses.append(dvSens)
xptSens = self.xptSensList[f]
xptSens.zeroEntries()
xptSenses.append(xptSens)
dIdu = self.dIduList[f]
dIdu.zeroEntries()
dIdus.append(dIdu)
adjoint = self.adjointList[f]
adjoint.zeroEntries()
adjoints.append(adjoint)
setupProblemTime = time.time()
adjointStartTime = {}
adjointEndTime = {}
# Next we will solve all the adjoints
# Set adjoint rhs
self.addSVSens(evalFuncs, dIdus)
adjointRHSTime = time.time()
for i, f in enumerate(evalFuncs):
adjointStartTime[f] = time.time()
self.solveAdjoint(dIdus[i], adjoints[i])
adjointEndTime[f] = time.time()
adjointFinishedTime = time.time()
# Evaluate all the adoint res prooduct at the same time for
# efficiency:
self.addDVSens(evalFuncs, dvSenses)
self.addAdjointResProducts(adjoints, dvSenses)
self.addXptSens(evalFuncs, xptSenses)
self.addAdjointResXptSensProducts(adjoints, xptSenses)
# Recast sensititivities into dict for user
for i, f in enumerate(evalFuncs):
key = self.name + "_%s" % f
# Return sensitivities as array in sens dict
funcsSens[key] = {
self.varName: dvSenses[i].getArray().copy(),
self.coordName: xptSenses[i].getArray().copy(),
}
totalSensitivityTime = time.time()
if self.getOption("printTiming"):
self._pp("+--------------------------------------------------+")
self._pp("|")
self._pp("| TACS Adjoint Times:")
self._pp("|")
self._pp(
"| %-30s: %10.3f sec"
% ("TACS Sens Setup Problem Time", setupProblemTime - startTime)
)
self._pp(
"| %-30s: %10.3f sec"
% ("TACS Adjoint RHS Time", adjointRHSTime - setupProblemTime)
)
for f in evalFuncs:
self._pp(
"| %-30s: %10.3f sec"
% (
"TACS Adjoint Solve Time - %s" % (f),
adjointEndTime[f] - adjointStartTime[f],
)
)
self._pp(
"| %-30s: %10.3f sec"
% (
"Total Sensitivity Time",
totalSensitivityTime - adjointFinishedTime,
)
)
self._pp("|")
self._pp(
"| %-30s: %10.3f sec"
% (
"Complete Sensitivity Time",
totalSensitivityTime - startTime,
)
)
self._pp("+--------------------------------------------------+")
[docs]
def addSVSens(self, evalFuncs, svSensList):
"""
Add the state variable partial sensitivity to the ADjoint RHS for given evalFuncs
Parameters
----------
evalFuncs : list[str]
The functions the user wants returned
svSensList : list[tacs.TACS.Vec] or list[numpy.ndarray]
List of sensitivity vectors to add partial sensitivity to
"""
# Set problem vars to assembler
self._updateAssemblerVars()
# Get list of TACS function handles from evalFuncs
funcHandles = [
self.functionList[f] for f in evalFuncs if f in self.functionList
]
# Create a tacs BVec copy for the operation if the output is a numpy array
if isinstance(svSensList[0], np.ndarray):
svSensBVecList = [
self._arrayToVec(svSensArray) for svSensArray in svSensList
]
# Otherwise the input is already a BVec and we can do the operation in place
else:
svSensBVecList = svSensList
self.assembler.addSVSens(
funcHandles, svSensBVecList, self.alpha, self.beta, self.gamma
)
# Update from the BVec values, if the input was a numpy array
if isinstance(svSensList[0], np.ndarray):
for svSensArray, svSensBVec in zip(svSensList, svSensBVecList):
svSensArray[:] = svSensBVec.getArray()
[docs]
def addDVSens(self, evalFuncs, dvSensList, scale=1.0):
"""
Add partial sensitivity contribution due to design vars for evalFuncs
Parameters
----------
evalFuncs : list[str]
The functions the user wants returned
dvSensList : list[tacs.TACS.Vec] or list[numpy.ndarray]
List of sensitivity vectors to add partial sensitivity to
scale : float
Scalar to multiply partial sensitivity by. Defaults to 1.0
"""
# Set problem vars to assembler
self._updateAssemblerVars()
# Get list of TACS function handles from evalFuncs
funcHandles = [
self.functionList[f] for f in evalFuncs if f in self.functionList
]
# Create a tacs BVec copy for the operation if the output is a numpy array
if isinstance(dvSensList[0], np.ndarray):
dvSensBVecList = [
self._arrayToDesignVec(dvSensArray) for dvSensArray in dvSensList
]
# Otherwise the input is already a BVec and we can do the operation in place
else:
dvSensBVecList = dvSensList
self.assembler.addDVSens(funcHandles, dvSensBVecList, scale)
# Finalize sensitivity arrays across all procs
for dvSensBVec in dvSensBVecList:
dvSensBVec.beginSetValues()
dvSensBVec.endSetValues()
# Update the BVec values, if the input was a numpy array
if isinstance(dvSensList[0], np.ndarray):
for dvSensArray, dvSensBVec in zip(dvSensList, dvSensBVecList):
# Copy values to numpy array
dvSensArray[:] = dvSensBVec.getArray()
[docs]
def addAdjointResProducts(self, adjointlist, dvSensList, scale=-1.0):
"""
Add the adjoint product contribution to the design variable sensitivity arrays
Parameters
----------
adjointlist : list[tacs.TACS.Vec] or list[numpy.ndarray]
List of adjoint vectors for residual sensitivity product
dvSensList : list[tacs.TACS.Vec] or list[numpy.ndarray]
List of sensitivity vectors to add product to
scale : float
Scalar to multiply product by. Defaults to -1.0
"""
# Set problem vars to assembler
self._updateAssemblerVars()
# Create a tacs BVec copy for the operation if the output is a numpy array
if isinstance(adjointlist[0], np.ndarray):
adjointBVeclist = [
self._arrayToVec(adjointArray) for adjointArray in adjointlist
]
# Otherwise the input is already a BVec and we can do the operation in place
else:
adjointBVeclist = adjointlist
# Make sure BC terms are zeroed out in adjoint
for adjoint in adjointBVeclist:
self.assembler.applyBCs(adjoint)
# Create a tacs BVec copy for the operation if the output is a numpy array
if isinstance(dvSensList[0], np.ndarray):
dvSensBVecList = [
self._arrayToDesignVec(dvSensArray) for dvSensArray in dvSensList
]
# Otherwise the input is already a BVec and we can do the operation in place
else:
dvSensBVecList = dvSensList
self.assembler.addAdjointResProducts(adjointBVeclist, dvSensBVecList, scale)
# Finalize sensitivity arrays across all procs
for dvSensBVec in dvSensBVecList:
dvSensBVec.beginSetValues()
dvSensBVec.endSetValues()
# Update the BVec values, if the input was a numpy array
if isinstance(dvSensList[0], np.ndarray):
for dvSensArray, dvSensBVec in zip(dvSensList, dvSensBVecList):
# Copy values to numpy array
dvSensArray[:] = dvSensBVec.getArray()
[docs]
def addXptSens(self, evalFuncs, xptSensList, scale=1.0):
"""
Add partial sensitivity contribution due to nodal coordinates for evalFuncs
Parameters
----------
evalFuncs : list[str]
The functions the user wants returned
xptSensList : list[tacs.TACS.Vec] or list[numpy.ndarray]
List of sensitivity vectors to add partial sensitivity to
scale : float
Scalar to multiply partial sensitivity by. Defaults to 1.0
"""
# Set problem vars to assembler
self._updateAssemblerVars()
# Get list of TACS function handles from evalFuncs
funcHandles = [
self.functionList[f] for f in evalFuncs if f in self.functionList
]
# Create a tacs BVec copy for the operation if the output is a numpy array
if isinstance(xptSensList[0], np.ndarray):
xptSensBVecList = [
self._arrayToNodeVec(xptSensArray) for xptSensArray in xptSensList
]
# Otherwise the input is already a BVec and we can do the operation in place
else:
xptSensBVecList = xptSensList
self.assembler.addXptSens(funcHandles, xptSensBVecList, scale)
# Finalize sensitivity arrays across all procs
for xptSensBVec in xptSensBVecList:
xptSensBVec.beginSetValues()
xptSensBVec.endSetValues()
# Update from the BVec values, if the input was a numpy array
if isinstance(xptSensList[0], np.ndarray):
for xptSensArray, xptSensBVec in zip(xptSensList, xptSensBVecList):
# Copy values to numpy array
xptSensArray[:] = xptSensBVec.getArray()
[docs]
def addAdjointResXptSensProducts(self, adjointlist, xptSensList, scale=-1.0):
"""
Add the adjoint product contribution to the nodal coordinates sensitivity arrays
Parameters
----------
adjointlist : list[tacs.TACS.Vec] or list[numpy.ndarray]
List of adjoint vectors for residual sensitivity product
xptSensList : list[tacs.TACS.Vec] or list[numpy.ndarray]
List of sensitivity vectors to add product to
scale : float
Scalar to multiply product by. Defaults to -1.0
"""
# Set problem vars to assembler
self._updateAssemblerVars()
# Create a tacs BVec copy for the operation if the output is a numpy array
if isinstance(adjointlist[0], np.ndarray):
adjointBVeclist = [
self._arrayToVec(adjointArray) for adjointArray in adjointlist
]
# Otherwise the input is already a BVec and we can do the operation in place
else:
adjointBVeclist = adjointlist
# Make sure BC terms are zeroed out in adjoint
for adjoint in adjointBVeclist:
self.assembler.applyBCs(adjoint)
# Create a tacs BVec copy for the operation if the output is a numpy array
if isinstance(xptSensList[0], np.ndarray):
xptSensBVecList = [
self._arrayToNodeVec(xptSensArray) for xptSensArray in xptSensList
]
# Otherwise the input is already a BVec and we can do the operation in place
else:
xptSensBVecList = xptSensList
self.assembler.addAdjointResXptSensProducts(
adjointBVeclist, xptSensBVecList, scale
)
# Finalize sensitivity arrays across all procs
for xptSensBVec in xptSensBVecList:
xptSensBVec.beginSetValues()
xptSensBVec.endSetValues()
if isinstance(xptSensList[0], np.ndarray):
for xptSensArray, xptSensBVec in zip(xptSensList, xptSensBVecList):
# Copy values to numpy array
xptSensArray[:] = xptSensBVec.getArray()
[docs]
def getResidual(self, res, Fext=None):
"""
This routine is used to evaluate directly the structural
residual. Only typically used with aerostructural analysis.
Parameters
----------
res : tacs.TACS.Vec or numpy.ndarray
If res is not None, place the residuals into this array.
Fext : tacs.TACS.Vec or numpy.ndarray, optional
Distributed array containing additional loads (ex. aerodynamic forces for aerostructural coupling)
to applied to RHS of the static problem.
"""
# Make sure assembler variables are up-to-date
self._updateAssemblerVars()
# Determine if the user vector is a BVec or numpy array
if isinstance(res, tacs.TACS.Vec):
resArray = None
else: # Input is a numpy array
resArray = res
res = self.res
# Sum the forces from the loads not handled by TACS
self.rhs.copyValues(self.F) # Fixed loads
# Add external loads, if specified
if Fext is not None:
if isinstance(Fext, tacs.TACS.Vec):
self.rhs.axpy(1.0, Fext)
elif isinstance(Fext, np.ndarray):
rhsArray = self.rhs.getArray()
rhsArray[:] = rhsArray[:] + Fext[:]
# Zero out forces on DOF that are subject to BCs
self.assembler.applyBCs(self.rhs)
# Assemble the TACS residual and subtract the externally handled loads
self.assembler.assembleRes(res, self._loadScale)
res.axpy(-self._loadScale, self.rhs)
# If requested, copy the residual to the output array
if resArray is not None:
resArray[:] = res.getArray()
[docs]
def getForces(self, externalForceVec, internalForceVec, Fext=None):
"""Compute the internal and external forces acting on the structure
The computations here are based on the residual equation:
r(u, loadScale) = -(Fint(u) + loadScale * Fext(u))
Thus, the internal forces are given by:
Fint(u) = -r(u, 0)
And the external forces are given by:
Fext(u) = -r(u, 1) - Fi(u)
Parameters
----------
externalForceVec : TACS BVec or numpy array
Vector/array to store external forces in
internalForceVec : TACS BVec or numpy array
Vector/array to store internal forces in
Fext : TACS BVec or numpy array, optional
Distributed array containing additional loads (ex. aerodynamic forces for aerostructural coupling)
to applied to RHS of the static problem.
"""
loadScale = self._loadScale
self.setLoadScale(0.0)
self.getResidual(internalForceVec, Fext)
self.setLoadScale(1.0)
self.getResidual(externalForceVec, Fext)
self.setLoadScale(loadScale)
# Compute internal forces
if isinstance(internalForceVec, tacs.TACS.Vec):
internalForceVec.scale(-1.0)
elif isinstance(internalForceVec, np.ndarray):
internalForceVec[:] = -internalForceVec[:]
# Compute external forces
if isinstance(externalForceVec, tacs.TACS.Vec):
externalForceVec.scale(-1.0)
elif isinstance(externalForceVec, np.ndarray):
externalForceVec[:] = -externalForceVec[:]
if isinstance(internalForceVec, tacs.TACS.Vec):
if isinstance(externalForceVec, tacs.TACS.Vec):
externalForceVec.axpy(-1.0, internalForceVec)
elif isinstance(externalForceVec, np.ndarray):
externalForceVec[:] = externalForceVec[:] - internalForceVec.getArray()
elif isinstance(internalForceVec, np.ndarray):
if isinstance(externalForceVec, np.ndarray):
externalForceVec[:] = externalForceVec[:] - internalForceVec[:]
elif isinstance(externalForceVec, tacs.TACS.Vec):
externalForceVec.axpy(-1.0, self._arrayToVec(internalForceVec))
[docs]
def getJacobian(self):
"""Get the problem's Jacobian in sciPy sparse matrix format
Returns
-------
K : (scipy.sparse.bsr_matrix, scipy.sparse.bsr_matrix) or (scipy.sparse.bsr_matrix, scipy.sparse.bsr_matrix, scipy.sparse.bsr_matrix, scipy.sparse.bsr_matrix)
A tuple of 2 scipy.sparse.bsr_matrices (A, B) if Jacobian is a TACSParallelMat, or 4
scipy.sparse.bsr_matrices (A, B, C, D) if Jacobian is a TACSSchurMat
"""
# Make sure stiffness mat is up-to-date
self._updateAssemblerVars()
self._initializeSolve()
# Return copy of scipy mat
return copy.deepcopy(self.K.getMat())
[docs]
def addTransposeJacVecProduct(self, phi, prod, scale=1.0):
"""
Adds product of transpose Jacobian and input vector into output vector as shown below:
prod += scale * J^T . phi
Parameters
----------
phi : tacs.TACS.Vec or numpy.ndarray
Input vector to product with the transpose Jacobian.
prod : tacs.TACS.Vec or numpy.ndarray
Output vector to add Jacobian product to.
scale : float
Scalar used to scale Jacobian product by.
"""
# Create a tacs bvec copy of the adjoint vector
if isinstance(phi, tacs.TACS.Vec):
self.phi.copyValues(phi)
elif isinstance(phi, np.ndarray):
self.phi.getArray()[:] = phi
# Tacs doesn't actually transpose the matrix here so keep track of
# RHS entries that TACS zeros out for BCs.
bcTerms = self.update
bcTerms.copyValues(self.phi)
self.assembler.applyBCs(self.phi)
bcTerms.axpy(-1.0, self.phi)
# Set problem vars to assembler
self._updateAssemblerVars()
self.K.mult(self.phi, self.res)
# Add bc terms back in
self.res.axpy(1.0, bcTerms)
# Output residual
if isinstance(prod, tacs.TACS.Vec):
prod.axpy(scale, self.res)
else:
prod[:] = prod + scale * self.res.getArray()
[docs]
def zeroVariables(self):
"""
Zero all the tacs solution b-vecs
"""
self.res.zeroEntries()
self.u.zeroEntries()
self.assembler.setVariables(self.u)
self.update.zeroEntries()
[docs]
def zeroLoads(self):
"""
Zero all applied loads
"""
self.F.zeroEntries()
self.auxElems = tacs.TACS.AuxElements()
[docs]
def solveAdjoint(self, rhs, phi):
"""
Solve the structural adjoint.
Parameters
----------
rhs : tacs.TACS.Vec or numpy.ndarray
right hand side vector for adjoint solve
phi : tacs.TACS.Vec or numpy.ndarray
BVec or numpy array into which the adjoint is saved
"""
# Set problem vars to assembler
self._updateAssemblerVars()
# Check if we need to initialize
self._initializeSolve()
# Create a copy of the adjoint/rhs guess
if isinstance(phi, tacs.TACS.Vec):
self.phi.copyValues(phi)
elif isinstance(phi, np.ndarray):
self.phi.getArray()[:] = phi
if isinstance(rhs, tacs.TACS.Vec):
self.adjRHS.copyValues(rhs)
elif isinstance(rhs, np.ndarray):
self.adjRHS.getArray()[:] = rhs
# Tacs doesn't actually transpose the matrix here so keep track of
# RHS entries that TACS zeros out for BCs.
bcTerms = self.update
bcTerms.copyValues(self.adjRHS)
self.assembler.applyBCs(self.adjRHS)
bcTerms.axpy(-1.0, self.adjRHS)
# Solve Linear System
self.linearSolver.solve(self.adjRHS, self.phi)
self.assembler.applyBCs(self.phi)
# Add bc terms back in
self.phi.axpy(1.0, bcTerms)
# Copy output values back to user vectors
if isinstance(phi, tacs.TACS.Vec):
phi.copyValues(self.phi)
elif isinstance(phi, np.ndarray):
phi[:] = self.phi.getArray()
[docs]
def getVariables(self, states=None):
"""
Return the current state values for the
problem
Parameters
----------
states : tacs.TACS.Vec or numpy.ndarray
Vector to place current state variables into (optional)
Returns
----------
states : numpy.ndarray
current state vector
"""
if isinstance(states, tacs.TACS.Vec):
states.copyValues(self.u)
elif isinstance(states, np.ndarray):
states[:] = self.u_array[:]
return self.u_array.copy()
[docs]
def setVariables(self, states):
"""
Set the structural states for current load case.
Parameters
----------
states : numpy.ndarray
Values to set. Must be the size of getNumVariables()
"""
# Copy array values
if isinstance(states, tacs.TACS.Vec):
self.u.copyValues(states)
elif isinstance(states, np.ndarray):
self.u_array[:] = states[:]
# Set states to assembler
self.assembler.setBCs(self.u)
self.assembler.setVariables(self.u)
# If this is a nonlinear problem then changing the state will change the jacobian
if self.isNonlinear:
self._jacobianUpdateRequired = True
[docs]
def getOutputFileName(self, outputDir=None, baseName=None, number=None):
"""Figure out a base path/name for output files
Parameters
----------
outputDir : str, optional
Directory to write file to, by default uses the 'outputDir' option
baseName : str, optional
Case name, by default uses self.name
number : int, optional
A number to append to the filename, by default uses the call-count of the problem
Returns
-------
str
Full path to output file (excluding any extension)
"""
# Check input
if outputDir is None:
outputDir = self.getOption("outputDir")
if baseName is None:
baseName = self.name
# If we are numbering solution, it saving the sequence of
# calls, add the call number
if number is not None:
# We need number based on the provided number:
baseName = baseName + "_%3.3d" % number
else:
# if number is none, i.e. standalone, but we need to
# number solutions, use internal counter
if self.getOption("numberSolutions"):
baseName = baseName + "_%3.3d" % self.callCounter
return os.path.join(outputDir, baseName)
[docs]
def writeSolution(self, outputDir=None, baseName=None, number=None):
"""
This is a generic shell function that writes the output
file(s). The intent is that the user or calling program can
call this function and pyTACS writes all the files that the
user has defined. It is recommended that this function is used
along with the associated logical flags in the options to
determine the desired writing procedure
Parameters
----------
outputDir : str or None
Use the supplied output directory
baseName : str or None
Use this supplied string for the base filename. Typically
only used from an external solver.
number : int or None
Use the user supplied number to index solution. Again, only
typically used from an external solver
"""
# Make sure assembler variables are up to date
self._updateAssemblerVars()
# Figure out the output file base name
baseName = self.getOutputFileName(outputDir, baseName, number)
# Unless the writeSolution option is off write actual file:
if self.getOption("writeSolution"):
self.outputViewer.writeToFile(baseName + ".f5")
[docs]
def writeSolutionHistory(self, outputDir=None, baseName=None, number=None):
"""Write the nonlinear solver history to a file
Parameters
----------
outputDir : str or None
Use the supplied output directory
baseName : str or None
Use this supplied string for the base filename. Typically
only used from an external solver.
number : int or None
Use the user supplied number to index solution. Again, only
typically used from an external solver
"""
# Figure out the output file base name
if self.nonlinearSolver is not None:
baseName = self.getOutputFileName(outputDir, baseName, number)
if self.nonlinearSolver.history is not None:
self.nonlinearSolver.history.save(baseName)
[docs]
def writeLoadToBDF(self, bdfFile, loadCaseID):
"""
Write loads from problem to NASTRAN BDF file.
NOTE: To get correct loads, `solve` method should be called before this method.
Parameters
----------
bdfFile: str or pyNastran.bdf.bdf.BDF or None
Name of file to write BDF file to. Only required on root proc,
can be None otherwise.
loadCaseID: int
NASTARAN loadcase ID to assign loads to in BDF.
"""
# Grab RHS vector from previous solve
F = self.rhs
F_array = np.real(F.getArray())
# Get local force info for each processor
multNodes = self.meshLoader.getLocalMultiplierNodeIDs()
globalToLocalNodeIDDict = self.meshLoader.getGlobalToLocalNodeIDDict()
# Gather local info to root processor
allMultNodes = self.comm.gather(multNodes, root=0)
allGlobalToLocalNodeIDDict = self.comm.gather(globalToLocalNodeIDDict, root=0)
allF = self.comm.gather(F_array, root=0)
vpn = self.getVarsPerNode()
# Assemble new BDF file on root
if self.comm.rank == 0:
if isinstance(bdfFile, str):
newBDFInfo = pn.bdf.BDF(debug=False)
elif isinstance(bdfFile, pn.bdf.BDF):
newBDFInfo = bdfFile
# Save subcase info to bdf
if newBDFInfo.case_control_deck is not None:
newBDFInfo.case_control_deck.create_new_subcase(loadCaseID)
newBDFInfo.case_control_deck.add_parameter_to_local_subcase(
loadCaseID, f"SUBTITLE={self.name}"
)
newBDFInfo.case_control_deck.add_parameter_to_local_subcase(
loadCaseID, "ANALYSIS=STATICS"
)
newBDFInfo.case_control_deck.add_parameter_to_local_subcase(
loadCaseID, f"LOAD={loadCaseID}"
)
# Tolerance for writing out point loads
zero_tol = 1e-6
# Write out force values
nastranNodeIDs = list(self.bdfInfo.node_ids)
# Loop through each proc and pull out nodal forces
for proc_i in range(self.comm.size):
Fxyz = allF[proc_i].reshape(-1, vpn)
for tacsGNodeID in allGlobalToLocalNodeIDDict[proc_i]:
# Get local node ID
tacsLNodeID = allGlobalToLocalNodeIDDict[proc_i][tacsGNodeID]
# Get Global nastran ID
nastranGNodeID = nastranNodeIDs[tacsGNodeID]
# Add force to bdf file (if its not a multiplier node)
if tacsLNodeID not in allMultNodes[proc_i]:
# Check if force is above tolerance before adding to bdf
if (
vpn >= 3
and np.linalg.norm(Fxyz[tacsLNodeID][:3]) > zero_tol
):
f = np.zeros(3)
for i in range(3):
if abs(Fxyz[tacsLNodeID][i]) > zero_tol:
f[i] = Fxyz[tacsLNodeID][i]
newBDFInfo.add_force(loadCaseID, nastranGNodeID, 1.0, f)
if (
vpn >= 6
and np.linalg.norm(Fxyz[tacsLNodeID][3:6]) > zero_tol
):
m = np.zeros(3)
for i in range(3):
if abs(Fxyz[tacsLNodeID][i + 3]) > zero_tol:
m[i] = Fxyz[tacsLNodeID][i + 3]
newBDFInfo.add_moment(loadCaseID, nastranGNodeID, 1.0, m)
# If bdf file was provided as a file name save it directly
if isinstance(bdfFile, str):
newBDFInfo.write_bdf(
bdfFile, size=16, is_double=True, write_header=False
)
# All procs should wait for root
self.comm.barrier()