"""
pyBase_problem
"""
# =============================================================================
# Imports
# =============================================================================
from functools import wraps
import copy
from baseclasses import StructProblem as BaseStructProblem
import numpy as np
from mpi4py import MPI
import pyNastran.bdf as pn
import tacs.TACS
# Define decorator functions for methods that must be called before initialize
def updateDVGeo(method):
@wraps(method)
def wrappedMethod(self, *args, **kwargs):
if self.DVGeo is not None:
if not self.DVGeo.pointSetUpToDate(self.ptSetName):
coords = self.DVGeo.update(self.ptSetName, config=self.name)
self.staticProblem.setNodes(coords.flatten())
for constraint in self.constraints:
constraint.setNodes(coords.flatten())
return method(self, *args, **kwargs)
return wrappedMethod
[docs]
class StructProblem(BaseStructProblem):
"""
MACH StructProblem wrapper for pyTACS StaticProblem class
"""
def __init__(
self,
staticProblem,
FEAAssembler,
DVGeo=None,
loadFile=None,
):
"""
Initialize the StructProblem.
Parameters
----------
staticProblem : tacs.problems.StaticProblem
StaticProblem object used for modeling and solving static cases.
FEAAssembler : tacs.pytacs.pyTACS
pyTACS assembler object.
DVGeo : pygeo.DVGeometry, optional
Object responsible for manipulating the geometry design variables.
If None, no geometric design variables will be used.
loadFile : str, optional
Filename of the (static) external load file. Should be
generated from pyAerostructure.
"""
self.staticProblem = staticProblem
self.FEAAssembler = FEAAssembler
self.DVGeo = None
self.numGeoDV = 0
self.ptSetName = None
self.loadFile = loadFile
self.constraints = []
if self.staticProblem.assembler != self.FEAAssembler.assembler:
raise RuntimeError(
"Provided StaticProblem does not correspond to pyTACS assembler object."
)
self.comm = self.staticProblem.comm
if DVGeo:
self.setDVGeo(DVGeo)
self.update = self.FEAAssembler.createVec(asBVec=True)
self.oldUpdate = self.FEAAssembler.createVec(asBVec=True)
self.temp0 = self.FEAAssembler.createVec(asBVec=True)
self.temp1 = self.FEAAssembler.createVec(asBVec=True)
self._Fext = self.FEAAssembler.createVec(asBVec=True)
self._phi = self.FEAAssembler.createVec(asBVec=True)
self._pLoad = self.FEAAssembler.createVec(asBVec=True)
self._dSdu = self.FEAAssembler.createVec(asBVec=True)
self._adjRHS = self.FEAAssembler.createVec(asBVec=True)
self._dIdu = self.FEAAssembler.createVec(asBVec=True)
self._matVecRHS = self.FEAAssembler.createVec(asBVec=True)
self._matVecSolve = self.FEAAssembler.createVec(asBVec=True)
self.adjoints = {
funcName: self.FEAAssembler.createVec(asBVec=True)
for funcName in self.evalFuncs
}
self.doDamp = False
if self.loadFile:
self.readExternalForceFile(self.loadFile)
@property
def name(self):
"""
Get the name of the structural problem.
Returns
-------
str
Name of the structural problem.
"""
return self.staticProblem.name
@property
def callCounter(self) -> int:
"""
Get the call counter for the underlying static problem.
Returns
-------
int
Number of times the static problem's solve method has been called.
"""
return self.staticProblem.callCounter
@property
def Fext(self):
"""
Get the external aerodynamic force vector.
Returns
-------
numpy.ndarray
External aerodynamic force vector as a numpy array.
"""
return self._Fext.getArray()
@Fext.setter
def Fext(self, value):
"""
Set the external aerodynamic force vector.
Parameters
----------
value : numpy.ndarray
External aerodynamic force vector to set.
"""
self._Fext[:] = value
@property
def phi(self):
"""
Get the adjoint vector.
Returns
-------
numpy.ndarray
Adjoint vector as a numpy array.
"""
return self._phi.getArray()
@phi.setter
def phi(self, value):
"""
Set the adjoint vector.
Parameters
----------
value : numpy.ndarray
Adjoint vector to set.
"""
self._phi[:] = value
@property
def pLoad(self):
"""
The contribution of the aerodynamic adjoint to the structural
adjoint RHS. This term is given by dAdu^T*psi and is subtracted
from the structural adjoint RHS.
Returns
-------
numpy.ndarray
Aerodynamic adjoint contribution to the structural adjoint RHS.
"""
return self._pLoad.getArray()
@pLoad.setter
def pLoad(self, value):
"""
Set the aerodynamic adjoint contribution to the structural
adjoint RHS. This term is given by dAdu^T*psi and is subtracted
from the structural adjoint RHS.
Parameters
----------
value : numpy.ndarray
Aerodynamic adjoint contribution to the structural adjoint RHS.
"""
self._pLoad[:] = value
@property
def dSdu(self):
"""
Get the product of the derivative of structural residual with
respect to state variables and the structural adjoint vector.
Returns
-------
numpy.ndarray
Product of the derivative of structural residual with
respect to state variables and the structural adjoint
vector.
"""
return self._dSdu.getArray()
@dSdu.setter
def dSdu(self, value):
"""
Set the product of the derivative of structural residual with
respect to state variables and the structural adjoint vector.
Parameters
----------
value : numpy.ndarray
Product of the derivative of structural residual with
respect to state variables and the structural adjoint
vector.
"""
self._dSdu[:] = value
@property
def adjRHS(self):
"""
Get the adjoint right-hand side vector.
Returns
-------
numpy.ndarray
Adjoint RHS vector as a numpy array.
"""
return self._adjRHS.getArray()
@adjRHS.setter
def adjRHS(self, value):
"""
Set the adjoint right-hand side vector.
Parameters
----------
value : numpy.ndarray or tacs.TACS.Vec
Adjoint RHS vector to set.
"""
self._adjRHS[:] = value
@property
def dIdu(self):
"""
Get the derivative of the function with respect to state variables.
Returns
-------
numpy.ndarray
Derivative vector as a numpy array with boundary conditions applied.
"""
self.FEAAssembler.applyBCsToVec(self._dIdu)
return self._dIdu.getArray()
@dIdu.setter
def dIdu(self, value):
"""
Set the derivative of the function with respect to state variables.
Parameters
----------
value : numpy.ndarray or tacs.TACS.Vec
Derivative vector to set.
"""
self._dIdu[:] = value
self.FEAAssembler.applyBCsToVec(self._dIdu)
@property
def matVecRHS(self):
"""
Get the matrix-vector product right-hand side vector.
Returns
-------
numpy.ndarray
Matrix-vector RHS vector as a numpy array.
"""
return self._matVecRHS.getArray()
@matVecRHS.setter
def matVecRHS(self, value):
"""
Set the matrix-vector product right-hand side vector.
Parameters
----------
value : numpy.ndarray or tacs.TACS.Vec
Matrix-vector RHS vector to set.
"""
self._matVecRHS[:] = value
@property
def matVecSolve(self):
"""
Get the matrix-vector solve vector.
Returns
-------
numpy.ndarray
Matrix-vector solve vector as a numpy array.
"""
return self._matVecSolve.getArray()
@matVecSolve.setter
def matVecSolve(self, value):
"""
Set the matrix-vector solve vector.
Parameters
----------
value : numpy.ndarray or tacs.TACS.Vec
Matrix-vector solve vector to set.
"""
self._matVecSolve[:] = value
[docs]
def getVarName(self):
"""
Get name for the design variables in pyOpt. Only needed
if more than 1 pytacs object is used in an optimization
Returns
----------
varName : str
Name of the design variables used in setDesignVars() dict.
"""
return self.staticProblem.getVarName()
[docs]
def setDVGeo(self, DVGeo, pointSetKwargs=None):
"""
Set the DVGeometry object that will manipulate 'geometry' in
this object. Note that TACS doe not **strictly** need a
DVGeometry object, but if optimization with geometric
changes is desired, then it is required.
Parameters
----------
dvGeo : A DVGeometry object.
Object responsible for manipulating the constraints that
this object is responsible for.
pointSetKwargs : dict
Keyword arguments to be passed to the DVGeo addPointSet call.
Useful for DVGeometryMulti, specifying FFD projection tolerances, etc
Examples
--------
>>> StructProblem.setDVGeo(DVGeo)
"""
if self.DVGeo:
raise RuntimeError("DVGeo has already been set.")
if pointSetKwargs is None:
pointSetKwargs = {}
self.DVGeo = DVGeo
# Get the number of geometry variables
self.numGeoDV = self.DVGeo.getNDV()
self.ptSetName = "tacs_%s_coords" % self.name
coords0 = self.staticProblem.getNodes()
self.DVGeo.addPointSet(coords0.reshape(-1, 3), self.ptSetName, **pointSetKwargs)
[docs]
def getDVGeo(self):
"""
Get the DVGeometry object.
Returns
-------
DVGeometry: pygeo.parameterization.BaseDVGeometry or None
DVGeometry object.
"""
return self.DVGeo
[docs]
def setDesignVars(self, x):
"""
Set the variables in the x-dict for this object.
Parameters
----------
x : dict
Dictionary of variables which may or may not contain the
design variable names this object needs
"""
if self.comm.rank != 0:
x = np.empty(0)
self.staticProblem.setDesignVars(x)
for constr in self.constraints:
constr.setDesignVars(x)
[docs]
def addConstraint(self, constr):
"""
Add pyTACS constraint object to StructProblem.
Parameters
----------
constr : tacs.constraints.TACSConstraint
pyTACS Constraint object
"""
if self.staticProblem.assembler != constr.assembler:
raise ValueError(
f"TACSConstraint object '{constr.name}' and StaticProblem '{self.staticProblem.name}' "
"were not created by same pyTACS assembler"
)
self.constraints.append(constr)
[docs]
def addVariablesPyOpt(self, optProb):
"""
Add the current set of variables to the optProb object.
Parameters
----------
optProb : pyOpt_optimization class
Optimization problem definition to which variables are added
"""
ndv = self.FEAAssembler.getTotalNumDesignVars()
dvName = self.staticProblem.getVarName()
if ndv > 0 and dvName not in optProb.variables:
value = self.getOrigDesignVars()
lb, ub = self.getDesignVarRange()
scale = self.getDesignVarScales()
optProb.addVarGroup(
dvName, ndv, "c", value=value, lower=lb, upper=ub, scale=scale
)
[docs]
def addConstraintsPyOpt(self, optProb, nonLinear=True, linear=True):
"""
Add any linear constraints that were generated during setup to
the specified pyOpt problem.
Parameters
----------
optProb : :class:`Optimization <pyoptsparse.pyOpt_optimization.Optimization>` instance
Optimization problem object to add constraints to
nonLinear : bool
Flag to include non-linear constraints.
linear : bool
Flag to include linear constraints.
"""
fcon = {}
fconSens = {}
conSizes = {}
conBounds = {}
conIsLinear = {}
self.evalConstraints(fcon, nonLinear=nonLinear, linear=linear)
self.evalConstraintsSens(fconSens, nonLinear=nonLinear, linear=linear)
for constr in self.constraints:
constr.getConstraintSizes(conSizes)
constr.getConstraintBounds(conBounds)
keys = constr.getConstraintKeys()
for key in keys:
conIsLinear[f"{constr.name}_{key}"] = constr.isLinear
for conName in fcon:
nCon = conSizes[conName]
if nCon > 0:
# Save the nonlinear constraint name
lb, ub = conBounds[conName]
# Just evaluate the constraint to get the jacobian structure
wrt = list(fconSens[conName].keys())
optProb.addConGroup(
conName,
nCon,
wrt=wrt,
jac=fconSens[conName],
lower=lb,
upper=ub,
linear=conIsLinear[conName],
)
[docs]
@updateDVGeo
def getNodes(self):
"""
Return the mesh coordinates of this problem.
Returns
-------
coords : numpy.ndarray
Structural coordinate in array of size (N * 3) where N is
the number of structural nodes on this processor.
"""
return self.staticProblem.getNodes()
[docs]
@updateDVGeo
def solve(self, damp=1.0, useAitkenAcceleration=False, dampLB=0.2, loadScale=1.0):
"""
Solution of the static problem for current load set. The
forces must already be set.
Parameters
----------
damp : float
Value to use to damp the solution update.
useAitkenAcceleration : bool
Flag to use aitkenAcceleration. Only applicable for aerostructural problems
dampLB : float
Lower bound for the damping parameter used in Aitken acceleration.
loadScale : float
Value to scale external loads by. Only useful for load step approach on nonlinear problems.
"""
# Save old load scale and update with user-defined one
loadScale0 = self.staticProblem.getLoadScale()
self.staticProblem.setLoadScale(loadScale)
# Save initial state vector
u0 = self.temp1
self.staticProblem.getVariables(u0)
# Solve static problem w/o damping
successFlag = self.staticProblem.solve(Fext=self._Fext)
# Compute undamped update
self.staticProblem.getVariables(self.update)
self.update.axpy(-1.0, u0)
# Reset problem back to unsolved state
self.staticProblem.setVariables(u0)
# Apply Aitken Acceleration if necessary:
if useAitkenAcceleration:
if self.doDamp:
# Compute: temp0 = update - old_update
self.temp0.zeroEntries()
self.temp0.axpy(1.0, self.update)
self.temp0.axpy(-1.0, self.oldUpdate)
dnom = self.temp0.dot(self.temp0)
damp = damp * (1.0 - self.temp0.dot(self.update) / dnom)
# Clip to a reasonable range
damp = np.clip(damp, dampLB, 1.0)
self.doDamp = True
# Update State Variables
u1 = self.temp0
u1.copyValues(u0)
u1.axpy(damp, self.update)
self.staticProblem.setVariables(u1)
# Set the old update
self.oldUpdate.copyValues(self.update)
# Set load scale back
self.staticProblem.setLoadScale(loadScale0)
return damp
[docs]
@updateDVGeo
def evalFunctions(self, funcs, evalFuncs=None, ignoreMissing=False):
"""
Evaluate the current functions
Parameters
----------
funcs : dict
Dictionary into which the functions are save
evalFuncs : iterable object containing strings
The functions that the user wants evaluated
"""
if evalFuncs is None:
evalFuncs = self.evalFuncs
return self.staticProblem.evalFunctions(funcs, evalFuncs, ignoreMissing)
[docs]
@updateDVGeo
def evalConstraints(
self, fcon, evalCons=None, ignoreMissing=False, nonLinear=True, linear=False
):
"""
Evaluate values for constraints. The constraints corresponding to the strings in
evalCons are evaluated and updated into the provided
dictionary.
Parameters
----------
fcon : dict
Dictionary into which the constraints are saved.
evalCons : iterable object containing strings.
If not none, use these constraints to evaluate.
ignoreMissing : bool
Flag to supress checking for a valid constraint. Please use
this option with caution.
nonLinear : bool
Flag to include non-linear constraints.
linear : bool
Flag to include linear constraints.
"""
for constr in self.constraints:
if evalCons is None or constr.name in evalCons:
if (linear and constr.isLinear) or (nonLinear and not constr.isLinear):
constr.evalConstraints(fcon, evalCons, ignoreMissing)
[docs]
@updateDVGeo
def evalFunctionsSens(self, funcsSens, evalFuncs=None):
"""
Evaluate the sensitivity of the desired functions
Parameters
----------
funcsSens : dict
Dictionary into which the function sensitivities are saved
evalFuncs : iterable object containing strings
The functions that the user wants evaluated
"""
if evalFuncs is None:
evalFuncs = self.evalFuncs
self.staticProblem.evalFunctionsSens(
funcsSens, evalFuncs, includeXptSens=self._includeXptSens
)
for funcName in evalFuncs:
funcKey = f"{self.name}_{funcName}"
dvKey = self.staticProblem.getVarName()
if dvKey in funcsSens[funcKey]:
funcsSens[funcKey][dvKey] = self.comm.bcast(
funcsSens[funcKey][dvKey], root=0
)
# Compute the DVGeo sensitivities if requested
coordName = self.staticProblem.getCoordName()
if self.DVGeo is not None:
for funcName in evalFuncs:
funcKey = f"{self.name}_{funcName}"
if coordName in funcsSens[funcKey]:
dIdpt = funcsSens[funcKey].pop(coordName).reshape(-1, 3)
dIdx = self.DVGeo.totalSensitivity(
dIdpt, self.ptSetName, comm=self.comm, config=self.name
)
funcsSens[funcKey].update(dIdx)
[docs]
@updateDVGeo
def evalConstraintsSens(
self,
fconSens,
evalCons=None,
nonLinear=True,
linear=False,
):
"""
This is the main routine for returning useful (sensitivity)
information from constraint. The derivatives of the constraints
corresponding to the strings in evalCons are evaluated and
updated into the provided dictionary. The derivatives with
respect to all design variables and node locations are computed.
Parameters
----------
fconSens : dict
Dictionary into which the derivatives are saved.
evalCons : iterable object containing strings
The constraints the user wants returned
nonLinear : bool
Flag to include non-linear constraints.
linear : bool
Flag to include linear constraints.
"""
sens = {}
for constr in self.constraints:
if evalCons is None or constr.name in evalCons:
if (linear and constr.isLinear) or (nonLinear and not constr.isLinear):
constr.evalConstraintsSens(
sens, evalCons, includeXptSens=self._includeXptSens
)
for conKey in sens:
dvKey = self.staticProblem.getVarName()
if dvKey in sens[conKey]:
sens[conKey][dvKey] = self.comm.bcast(sens[conKey][dvKey], root=0)
# Compute the DVGeo sensitivities if requested
if self.DVGeo is not None:
coordName = self.staticProblem.getCoordName()
self.DVGeo.computeTotalJacobian(self.ptSetName, config=self.name)
for conKey in sens:
if coordName in sens[conKey]:
# Pop out the constraint sensitivities wrt TACS coords
dIdpt = sens[conKey].pop(coordName)
# Check if the constraint sensitivities wrt TACS coords are zero on all procs
total_nnz = self.comm.allreduce(dIdpt.nnz, op=MPI.SUM)
if total_nnz == 0:
# if so, skip DVGeo sensitivities
continue
# Get the Jacobian
Jacobian = self.DVGeo.JT[self.ptSetName]
# Compute the local Jacobian product
dIdx_local = dIdpt.dot(Jacobian.T)
# Add dvgeo contribution across all procs
dIdx = self.comm.allreduce(dIdx_local.toarray(), op=MPI.SUM)
# Convert to dict
dIdx_dict = self.DVGeo.convertSensitivityToDict(np.atleast_2d(dIdx))
# Update sensitivity dict with new DVGeo sensitivities
sens[conKey].update(dIdx_dict)
fconSens.update(sens)
[docs]
def getVarsPerNode(self):
"""
The number of degrees of freedom used at each output location.
Returns
-------
int
Number of degrees of freedom of each node in the domain.
"""
return self.staticProblem.getVarsPerNode()
@property
def _includeXptSens(self):
"""
Check if nodal sensitivities should be included (i.e. if DVGeo is set).
Returns
-------
bool
True if nodal sensitivities should be included, False otherwise.
"""
return self.DVGeo is not None
[docs]
def getNumNodes(self):
"""
Get the number of nodes on this processor.
Returns
-------
int
Number of nodes on this processor.
"""
return self.staticProblem.getNumOwnedNodes()
[docs]
def getOrigDesignVars(self):
"""
Get an array holding the original values for all design
variables added to TACS at time of initialization.
Returns
-------
numpy.ndarray
Array containing all design variable values across all processors.
"""
local_dvs = self.FEAAssembler.getOrigDesignVars()
all_local_dvs = self.comm.allgather(local_dvs)
global_dvs = np.concatenate(all_local_dvs)
return global_dvs.astype(float)
[docs]
def getDesignVarRange(self):
"""
Get arrays containing the lower and upper bounds for the design variables,
in the form needed by pyoptsparse.
Returns
-------
tuple of numpy.ndarray
Lower and upper bounds for the design variables.
"""
local_lb, local_ub = self.FEAAssembler.getDesignVarRange()
all_lb = self.comm.allgather(local_lb)
global_lbs = np.concatenate(all_lb)
all_ub = self.comm.allgather(local_ub)
global_ubs = np.concatenate(all_ub)
return global_lbs.astype(float), global_ubs.astype(float)
[docs]
def getDesignVarScales(self):
"""
Get an array containing the scaling factors for the design
variables, in the form needed by pyoptsparse.
method.
Returns
-------
numpy.ndarray
Scaling values for design variables.
"""
return np.array(self.FEAAssembler.scaleList)
[docs]
def getNumDesignVars(self):
"""
Get total number of structural design variables across all processors.
Returns
-------
int
Total number of structural design variables.
"""
return self.FEAAssembler.getTotalNumDesignVars()
[docs]
def getStates(self, states=None):
"""
Return the current state values for the problem.
Parameters
----------
states : tacs.TACS.Vec or numpy.ndarray, optional
Vector to place current state variables into.
Returns
-------
numpy.ndarray
Current state vector.
"""
return self.staticProblem.getVariables(states)
[docs]
def setStates(self, states=None):
"""
Set the current state values for the problem.
Parameters
----------
states : tacs.TACS.Vec or numpy.ndarray
Vector to replace current state variables with.
"""
self.staticProblem.setVariables(states)
[docs]
def getExternalForce(self, Fext=None):
"""
Return the current external coupling force vector for the problem.
Parameters
----------
Fext : tacs.TACS.Vec or numpy.ndarray, optional
Vector to place current external coupling force vector into.
Returns
-------
numpy.ndarray
Current external coupling force vector.
"""
if isinstance(Fext, tacs.TACS.Vec):
Fext.copyValues(self._Fext)
elif isinstance(Fext, np.ndarray):
Fext[:] = self._Fext[:]
return self._Fext.getArray().copy()
[docs]
def setExternalForce(self, Fext):
"""
Set the external coupling force vector for the problem.
Parameters
----------
Fext : tacs.TACS.Vec or numpy.ndarray
External coupling force vector to set.
"""
if isinstance(Fext, tacs.TACS.Vec):
self._Fext.copyValues(Fext)
elif isinstance(Fext, np.ndarray):
self._Fext.getArray()[:] = Fext[:]
[docs]
def getStateNorm(self):
"""
Get the norm of the current state vector.
Returns
-------
float
Norm of the current state vector.
"""
u = self.temp0
self.staticProblem.getVariables(u)
return u.norm()
[docs]
def getResidual(self):
"""
This routine is used to evaluate directly the structural
residual. Only typically used with aerostructural analysis.
Returns
-------
numpy.ndarray
Computed residuals for problem.
"""
res = self.temp0
self.staticProblem.getResidual(res, Fext=self._Fext)
resArray = res.getArray().copy()
self.temp0.zeroEntries()
return resArray
[docs]
def getResNorms(self):
"""
Return the initial, starting and final residual norms. Note that
the same norms are used for both solution and adjoint
computations.
Returns
-------
tuple of float
Initial, starting, and final residual norms.
"""
initNorm = np.real(self.staticProblem.initNorm)
startNorm = np.real(self.staticProblem.startNorm)
finalNorm = np.real(self.staticProblem.finalNorm)
return (initNorm, startNorm, finalNorm)
[docs]
def setResNorms(self, initNorm=None, startNorm=None, finalNorm=None):
"""
Set one of these norms if not None.
This is typically only used with aerostructural analysis.
Parameters
----------
initNorm : float, optional
Initial residual norm to set.
startNorm : float, optional
Starting residual norm to set.
finalNorm : float, optional
Final residual norm to set.
"""
if initNorm is not None:
self.staticProblem.initNorm = initNorm
if startNorm is not None:
self.staticProblem.startNorm = startNorm
if finalNorm is not None:
self.staticProblem.finalNorm = finalNorm
[docs]
def zeroVectors(self):
"""
Zero all the TACS b-vectors.
"""
self.staticProblem.zeroVariables()
self.update.zeroEntries()
@property
def evalFuncs(self):
"""
Get strings for added evaluation functions.
Returns
-------
list of str
List of function names that can be evaluated.
"""
return list(self.staticProblem.getFunctionKeys())
[docs]
def getAdjoint(self, func):
"""
Return the adjoint values for func if they exist. Otherwise just return zeros.
Parameters
----------
func : str
Name of the function.
Returns
-------
numpy.ndarray
Adjoint values for the specified function.
"""
if func in self.adjoints:
vals = self.adjoints[func].getArray().copy()
else:
vals = self.FEAAssembler.createVec()
return vals
[docs]
def setAdjoint(self, adjoint, func=None):
"""
Set externally supplied adjoint values.
Parameters
----------
adjoint : float, numpy array
An array of size getStateSize() to
be copied to the structural adjoint variables
func : str
Name of function to set adjoint for
"""
self.phi[:] = adjoint
if func is not None:
if func not in self.adjoints:
self.adjoints[func] = self.FEAAssembler.createVec(asBVec=True)
self.adjoints[func].getArray()[:] = adjoint[:]
[docs]
def setAdjointRHS(self, func):
"""
Set the adjoint right-hand side for given function.
Parameters
----------
func : str
Name of the function to set adjoint RHS for.
"""
self._dIdu.zeroEntries()
if func in self.staticProblem.functionList:
self.staticProblem.addSVSens([func], [self._dIdu])
[docs]
def getdSduVec(self, inVec):
"""
Evaluate the direct residual.
Parameters
----------
inVec : numpy.ndarray
Input vector.
Returns
-------
numpy.ndarray
Direct residual vector.
"""
self.phi[:] = inVec
self.staticProblem.K.mult(self._phi, self.temp0)
outVec = self.temp0.getArray().copy()
self.temp0.zeroEntries()
return outVec
[docs]
def getdSduTVec(self, inVec):
"""
Evaluate the adjoint residual.
Parameters
----------
inVec : numpy.ndarray
Input vector.
Returns
-------
numpy.ndarray
Adjoint residual vector.
"""
self.phi[:] = inVec
self.temp0.zeroEntries()
self.staticProblem.addJacVecProduct(
self._phi, self.temp0, scale=1.0, transpose=True
)
self.FEAAssembler.applyBCsToVec(self.temp0)
outVec = self.temp0.getArray().copy()
self.temp0.zeroEntries()
return outVec
[docs]
def getdIdXpt(self, func):
"""
Get the (partial) derivative of the structural function
with respect to all structural nodes for the current
structProblem.
Parameters
----------
func : str
Name of the function to compute derivatives for.
Returns
-------
tacs.TACS.Vec
Derivative vector with respect to structural nodes.
"""
dIdXpt = self.FEAAssembler.createNodeVec()
# Check to see if func is actually a valid TACS function
if func in self.evalFuncs:
self.staticProblem.addXptSens([func], [dIdXpt], scale=1.0)
return dIdXpt
[docs]
def getdIdXdv(self, func):
"""
Get the (partial) derivative of the structural function
with respect to all structural design variables.
Parameters
----------
func : str
Name of the function to compute derivatives for.
Returns
-------
tacs.TACS.Vec
Derivative vector with respect to structural design variables.
"""
dIdXdv = self.FEAAssembler.createDesignVec()
# Check to see if func is actually a valid TACS function
if func in self.evalFuncs:
self.staticProblem.addDVSens([func], [dIdXdv], scale=1.0)
dvVal = self.comm.bcast(dIdXdv, root=0)
return dvVal
[docs]
def getdIduTransProd(self, vecT, evalFuncs=None):
"""
Perform the transpose matvec of the getdIduProd function and
return a TACS vector. The result is stored locally.
We assume that evalFuncs is valid.
Parameters
----------
vecT : dict
Dictionary containing transpose vector values.
evalFuncs : list of str, optional
List of function names to evaluate.
Returns
-------
numpy.ndarray
Transpose matrix-vector product result.
"""
# The old way (for testing purposes)
self._matVecRHS.zeroEntries()
svList = [self.FEAAssembler.createVec() for f in evalFuncs]
self.staticProblem.addSVSens(evalFuncs, svList)
for i, f in enumerate(evalFuncs):
f_mangled = self.name + "_%s" % f
self._matVecRHS.axpy(vecT[f_mangled][0], svList[i])
return self.matVecRHS.copy()
[docs]
def getdIdXdvTransProd(self, vecT, evalFuncs=None):
"""
Perform the transpose matvec of the getdIdXdvProd function and
return a vector.
We assume that structProblem is set and that handles is valid.
Note that handles contains the list of function handles, whereas the
names are contained in evalFuncs.
Parameters
----------
vecT : dict
Dictionary containing transpose vector values.
evalFuncs : list of str, optional
List of function names to evaluate.
Returns
-------
dict
Dictionary containing transpose matrix-vector product results.
"""
# # The old way (for testing purposes)
prodDV = self.FEAAssembler.createDesignVec(asBVec=True)
for f in evalFuncs:
f_mangled = self.name + "_%s" % f
self.staticProblem.addDVSens([f], [prodDV], scale=vecT[f_mangled][0])
# Convert result back into a dictionary
prodDict = self.convertDesignVecToDict(prodDV.getArray())
if self.DVGeo is not None:
prodXpt = self.FEAAssembler.createNodeVec(asBVec=True)
for f in evalFuncs:
f_mangled = self.name + "_%s" % f
self.staticProblem.addXptSens([f], [prodXpt], scale=vecT[f_mangled][0])
xArray = prodXpt.getArray()
xdot = self.DVGeo.totalSensitivity(
xArray.reshape(-1, 3), self.ptSetName, comm=self.comm, config=self.name
)
prodDict.update(xdot)
return prodDict
[docs]
def globalNKPreCon(self, inVec):
"""
This function is ONLY used as a preconditioner to the
global Aero-Structural system.
Parameters
----------
inVec : numpy.ndarray
Input vector for preconditioning.
Returns
-------
numpy.ndarray
Preconditioned output vector.
"""
# Place the in_vec into the residual vector
res = self.temp0
update = self.temp1
res.getArray()[:] = inVec
# Solve
self.staticProblem.linearSolver.solve(res, update)
outVec = update.getArray()
# Restore Pointers
res.zeroEntries()
update.zeroEntries()
return outVec
[docs]
def globalAdjointPreCon(self, inVec):
"""
This function is ONLY used as a preconditioner to the
global Aero-Structural adjoint system. For the linear
symmetric case this is actually identical to the NKPreCon
function above.
Parameters
----------
inVec : numpy.ndarray
Input vector for preconditioning.
Returns
-------
numpy.ndarray
Preconditioned output vector.
"""
# Place the in_vec into the residual vector
self.temp0.getArray()[:] = inVec[:]
self.FEAAssembler.applyBCsToVec(self.temp0)
# Apply the preconditioner ONLY.
self.staticProblem.PC.applyFactor(self.temp0, self.temp1)
self.FEAAssembler.applyBCsToVec(self.temp1)
outVec = self.temp1.getArray().copy()
# Zero values
self.temp0.zeroEntries()
self.temp1.zeroEntries()
return outVec
[docs]
def globalDirectPreCon(self, inVec):
"""
This function is ONLY used as a preconditioner to the
global Aero-Structural direct system. For the linear
symmetric case this is actually identical to the NKPreCon
function above.
Parameters
----------
inVec : numpy.ndarray
Input vector for preconditioning.
Returns
-------
numpy.ndarray
Preconditioned output vector.
"""
return self.globalAdjointPreCon(inVec)
[docs]
def assembleAdjointRHS(self):
"""
Compute the final adjoint RHS:
Adjoint RHS should be: dIdu - dAdu^T*psi - dSdu^T*phi
"""
# Set RHS to dIdu
self._adjRHS.copyValues(self._dIdu)
# Subtract dSdu if non-zero
self._adjRHS.axpy(-1.0, self._dSdu)
# Subtract pLoad
self._adjRHS.axpy(-1.0, self._pLoad)
self.staticProblem.initNorm = np.real(self._adjRHS.norm())
self.oldUpdate.zeroEntries()
[docs]
def solveAdjoint(self, damp=1.0):
"""
Solve the structural adjoint.
Parameters
----------
damp : float
A damping variable for adjoint update. Typically only used
in multidisciplinary analysis
"""
res = self.temp0
res.zeroEntries()
self.FEAAssembler.applyBCsToVec(self._adjRHS)
# First compute the residual
self.staticProblem.addJacVecProduct(self._phi, res, transpose=True)
res.axpy(-1.0, self._adjRHS) # Add the -RHS
# Starting Norm for this computation
self.staticProblem.startNorm = np.real(res.norm())
# Solve Linear System
self.update.zeroEntries()
self.staticProblem.solveAdjoint(res, self.update)
self.FEAAssembler.applyBCsToVec(self.update)
# Update the adjoint vector with the (damped) update
self._phi.axpy(-damp, self.update)
# Compute actual final FEA Norm
res.zeroEntries()
self.staticProblem.addJacVecProduct(self._phi, res, transpose=True)
res.axpy(-1.0, self._adjRHS) # Add the RHS
self.staticProblem.finalNorm = np.real(res.norm())
[docs]
def getdRdXptPhi(self, funcs):
r"""
Get the result of :math:`[dR/dX_{nodes}]^T \phi` for each of the functions in the function list.
This is the total sensitivity calculation.
Parameters
----------
funcs : list of str
List of function names.
Returns
-------
list of tacs.TACS.Vec
List of sensitivity products for each function.
"""
products = [self.FEAAssembler.createNodeVec() for obj in funcs]
phiList = [self.getAdjoint(obj) for obj in funcs]
self.staticProblem.addAdjointResXptSensProducts(phiList, products, scale=1.0)
return products
[docs]
def getdRdXdvPhi(self, funcs):
r"""
Get the result of :math:`[dR/dX_{dv}]^T \phi` for the total sensitivity calculation.
Parameters
----------
funcs : list of str
List of function names.
Returns
-------
list of tacs.TACS.Vec
List of sensitivity products for each function.
"""
products = [self.FEAAssembler.createDesignVec() for obj in funcs]
phiList = [self.getAdjoint(obj) for obj in funcs]
self.staticProblem.addAdjointResProducts(phiList, products, scale=1.0)
dvVals = [self.comm.bcast(dvVec, root=0) for dvVec in products]
return dvVals
[docs]
def getdRdXdvTransProd(self):
"""
Perform the transpose matvec of getdRdXdvProd. The result is
returned as a dict of numpy arrays.
Returns
-------
dict
Dictionary containing transpose matrix-vector product results.
"""
dvProd = self.FEAAssembler.createDesignVec(asBVec=True)
self.staticProblem.addAdjointResProducts(
[self._matVecSolve], [dvProd], scale=1.0
)
# Convert result back into a dictionary
prodDict = self.convertDesignVecToDict(dvProd.getArray())
if self.DVGeo is not None:
xptProd = self.FEAAssembler.createNodeVec(asBVec=True)
self.staticProblem.addAdjointResXptSensProducts(
[self._matVecSolve], [xptProd], scale=1.0
)
xArray = xptProd.getArray()
xdot = self.DVGeo.totalSensitivity(
xArray.reshape(-1, 3), self.ptSetName, comm=self.comm, config=self.name
)
prodDict.update(xdot)
return prodDict
[docs]
def convertDesignVecToDict(self, dvVec):
"""
Convert a design vector to a dictionary format.
Parameters
----------
dvVec : tacs.TACS.Vec or numpy.ndarray
Design vector to convert.
Returns
-------
dict
Dictionary containing the design vector with variable name as key.
"""
if isinstance(dvVec, tacs.TACS.Vec):
dvVec = dvVec.getArray()
dvVals = self.comm.bcast(dvVec, root=0)
dvDict = {self.staticProblem.getVarName(): dvVals}
return dvDict
[docs]
def writeSolution(self, outputDir=None, baseName=None, number=None):
"""
This is a generic shell function that writes the output
file(s). The intent is that the user or calling program can
call this function and pyTACS writes all the files that the
user has defined. It is recommended that this function is used
along with the associated logical flags in the options to
determine the desired writing procedure
Parameters
----------
outputDir : str or None
Use the supplied output directory
baseName : str or None
Use this supplied string for the base filename. Typically
only used from an external solver.
number : int or None
Use the user supplied number to index solution. Again, only
typically used from an external solver
"""
self.staticProblem.writeSolution(outputDir, baseName, number)
[docs]
def writeExternalForceFile(self, outputDir=None, baseName=None, number=None):
"""
This function writes the external coupling loads stored in Fext to a file.
This is typically used to save aero loads from an aerostructural run.
Parameters
----------
outputDir : str, optional
Output directory for the force file.
baseName : str, optional
Base name for the force file. If None, uses problem name + "_external_forces".
number : int, optional
Number to append to the filename.
"""
if baseName is None:
baseName = self.name + "_external_forces"
# Figure out the output file base name
fileName = (
self.staticProblem.getOutputFileName(outputDir, baseName, number) + ".dat"
)
# We want to isolate only the external aero loads before writing the loads out
F = self.staticProblem.F
# Save aux element loads (gravity, pressure, etc.)
aux = self.staticProblem.auxElems
# Save a copy of the F vector holding the full loads
self.temp0.copyValues(F)
# Zero out the loads
self.staticProblem.zeroLoads()
# Get forces
self.staticProblem.getForces(
self.staticProblem.externalForce,
self.staticProblem.internalForce,
Fext=self._Fext,
)
# Write external aero loads to bdf
self.staticProblem.writeLoadToBDF(fileName, loadCaseID=0)
# Reset F back to full loads
F.copyValues(self.temp0)
# Restore aux element loads
self.staticProblem.auxElems = aux
# Reset external/internal forces
self.staticProblem.getForces(
self.staticProblem.externalForce,
self.staticProblem.internalForce,
Fext=self._Fext,
)
[docs]
def readExternalForceFile(self, fileName):
"""
Reads in a force file and sets the external forces in the structural problem.
This is typically used to read in loads saved from an aerostructural run.
Parameters
----------
fileName : str
Filename for force file. Should have .dat or .bdf extension.
"""
forceInfo = pn.bdf.read_bdf(
fileName, validate=False, xref=False, debug=False, punch=True
)
bdfInfo = self.staticProblem.bdfInfo
# Step 1: Store original loads from bdfInfo
originalLoads = copy.deepcopy(bdfInfo.loads)
originalLoadCombinations = copy.deepcopy(bdfInfo.load_combinations)
# Step 2: Overwrite bdfInfo loads with forceInfo loads
bdfInfo.loads = copy.deepcopy(forceInfo.loads)
bdfInfo.load_combinations = copy.deepcopy(forceInfo.load_combinations)
# Create a copy of the internal loads already added to model
F = self.staticProblem.F
self.temp0.copyValues(F)
# Zero out the loads
F.zeroEntries()
# Read the external loads
self.staticProblem.addLoadFromBDF(0)
# Copy new loads to ext load vector
self._Fext.copyValues(F)
# Set internal loads back to previous values
F.copyValues(self.temp0)
# Step 3: Restore original loads back into bdfInfo
bdfInfo.loads = originalLoads
bdfInfo.load_combinations = originalLoadCombinations