Source code for daetools.code_generators.cxx_mpi

"""
***********************************************************************************
                            cxx_mpi.py
                DAE Tools: pyDAE module, www.daetools.com
                Copyright (C) Dragan Nikolic
***********************************************************************************
DAE Tools is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License version 3 as published by the Free Software
Foundation. DAE Tools is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the
DAE Tools software; if not, see <http://www.gnu.org/licenses/>.
************************************************************************************
"""
import os, shutil, sys, numpy, math, traceback, pprint, struct
from daetools.pyDAE import *
from .formatter import daeExpressionFormatter
from .analyzer import daeCodeGeneratorAnalyzer
from .code_generator import daeCodeGenerator

class daeExpressionFormatter_cxx_mpi(daeExpressionFormatter):
    def __init__(self):
        daeExpressionFormatter.__init__(self)
        self.indexBase                              = 0
        self.useFlattenedNamesForAssignedVariables  = True
        self.IDs                                    = {}
        self.indexMap                               = {}

        # Use relative names
        self.useRelativeNames         = True
        self.flattenIdentifiers       = True

        self.domain                   = 'adouble(_m_->{domain}[{index}])'

        self.parameter                = 'adouble(_m_->{parameter}{indexes})'
        self.parameterIndexStart      = '['
        self.parameterIndexEnd        = ']'
        self.parameterIndexDelimiter  = ']['

        self.variable                 = '_v_({blockIndex})'
        self.variableIndexStart       = ''
        self.variableIndexEnd         = ''
        self.variableIndexDelimiter   = ''

        self.assignedVariable         = 'adouble(_m_->{variable})'
        
        self.feMatrixItem             = 'adouble({value})'
        self.feVectorItem             = 'adouble({value})'

        self.derivative               = '_dv_({blockIndex})'
        self.derivativeIndexStart     = ''
        self.derivativeIndexEnd       = ''
        self.derivativeIndexDelimiter = ''

        # Constants
        self.constant = 'adouble({value})'
        
        # External functions
        self.scalarExternalFunction = 'modCalculateScalarExtFunction("{name}", _m_, _current_time_, _values_, _time_derivatives_)'
        self.vectorExternalFunction = 'adouble(0.0, 0.0)'

        # Logical operators
        self.AND   = '({leftValue} && {rightValue})'
        self.OR    = '({leftValue} || {rightValue})'
        self.NOT   = '(! {value})'

        self.EQ    = '({leftValue} == {rightValue})'
        self.NEQ   = '({leftValue} != {rightValue})'
        self.LT    = '({leftValue} < {rightValue})'
        self.LTEQ  = '({leftValue} <= {rightValue})'
        self.GT    = '({leftValue} > {rightValue})'
        self.GTEQ  = '({leftValue} >= {rightValue})'

        # Mathematical operators
        self.SIGN   = '(-{value})'

        self.PLUS   = '({leftValue} + {rightValue})'
        self.MINUS  = '({leftValue} - {rightValue})'
        self.MULTI  = '({leftValue} * {rightValue})'
        self.DIVIDE = '({leftValue} / {rightValue})'
        self.POWER  = '({leftValue} ^ {rightValue})'

        # Mathematical functions
        self.SIN    = 'sin_({value})'
        self.COS    = 'cos_({value})'
        self.TAN    = 'tan_({value})'
        self.ASIN   = 'asin_({value})'
        self.ACOS   = 'acos_({value})'
        self.ATAN   = 'atan_({value})'
        self.EXP    = 'exp_({value})'
        self.SQRT   = 'sqrt_({value})'
        self.LOG    = 'log_({value})'
        self.LOG10  = 'log10_({value})'
        self.FLOOR  = 'floor_({value})'
        self.CEIL   = 'ceil_({value})'
        self.ABS    = 'abs_({value})'
        self.SINH   = 'sinh_({value})'
        self.COSH   = 'cosh_({value})'
        self.TANH   = 'tanh_({value})'
        self.ASINH  = 'asinh_({value})'
        self.ACOSH  = 'acosh_({value})'
        self.ATANH  = 'atanh_({value})'
        self.ERF    = 'erf_({value})'

        self.MIN     = 'min_({leftValue}, {rightValue})'
        self.MAX     = 'max_({leftValue}, {rightValue})'
        self.ARCTAN2 = 'atan2_({leftValue}, {rightValue})'

        # Current time in simulation
        self.TIME   = '_time_'

    def formatNumpyArray(self, arr):
        if isinstance(arr, (numpy.ndarray, list)):
            return '{' + ', '.join([self.formatNumpyArray(val) for val in arr]) + '}'
        else:
            return str(arr)

    def formatQuantity(self, quantity):
        # Formats constants/quantities in equations that have a value and units
        return str(quantity.value)
     
[docs]class daeCodeGenerator_cxx_mpi(daeCodeGenerator): """ Limitations: - DOFs in models are not supported (for the block indexes are not set uniformly) - STNs are not supported """ def __init__(self): self.wrapperInstanceName = '' self.defaultIndent = ' ' self.warnings = [] self.topLevelModel = None self.simulation = None self.equationGenerationMode = '' self.assignedVariablesDefs = [] self.assignedVariablesInits = [] self.initialConditions = [] self.stnDefs = [] self.initiallyActiveStates = [] self.runtimeInformation_h = [] self.runtimeInformation_init = [] self.parametersDefs = [] self.parametersInits = [] self.intValuesReferences = [] self.floatValuesReferences = [] self.stringValuesReferences = [] self.residuals = [] self.jacobians = [] self.checkForDiscontinuities = [] self.executeActions = [] self.numberOfRoots = [] self.rootFunctions = [] self.variableNames = [] self.fmiInterface = [] self.exprFormatter = daeExpressionFormatter_cxx_mpi() self.analyzer = daeCodeGeneratorAnalyzer() self.mpi_sync_buffers = {} self.rt_data_buffers = {} self.compute_stack_files = {} # MPI self.Nnodes = 0 self.Neq_per_node = 0 self.mpi_synchronise = '' self.mpi_node_block_indexes_map = {}
[docs] def generateSimulation(self, simulation, directory, Nnodes): if not simulation: raise RuntimeError('Invalid simulation object') if not os.path.isdir(directory): os.makedirs(directory) self.assignedVariablesDefs = [] self.assignedVariablesInits = [] self.initialConditions = [] self.stnDefs = [] self.initiallyActiveStates = [] self.runtimeInformation_h = [] self.runtimeInformation_init = [] self.parametersDefs = [] self.parametersInits = [] self.intValuesReferences = [] self.floatValuesReferences = [] self.stringValuesReferences = [] self.residuals = [] self.jacobians = [] self.checkForDiscontinuities = [] self.executeActions = [] self.numberOfRoots = [] self.rootFunctions = [] self.variableNames = [] self.warnings = [] self.simulation = simulation self.topLevelModel = simulation.m self.mpi_sync_buffers = {} self.rt_data_buffers = {} self.compute_stack_files = {} # Achtung, Achtung!! # wrapperInstanceName and exprFormatter.modelCanonicalName should not be stripped # of illegal characters, since they are used to get relative names self.wrapperInstanceName = simulation.m.Name self.exprFormatter.modelCanonicalName = simulation.m.Name indent = 1 s_indent = indent * self.defaultIndent self.analyzer.analyzeSimulation(simulation) self.exprFormatter.IDs = self.analyzer.runtimeInformation['IDs'] self.exprFormatter.indexMap = self.analyzer.runtimeInformation['IndexMappings'] #import pprint #pp = pprint.PrettyPrinter(indent=2) #pp.pprint(self.analyzer.runtimeInformation) # MPI self.mpi_synchronise = '' self.mpi_node_block_indexes_map = {} Neq = self.analyzer.runtimeInformation['NumberOfEquations'] self.Nnodes = Nnodes self.Neq_per_node = int(Neq / self.Nnodes) #+ 1 for node in range(self.Nnodes): i_start = self.Neq_per_node*node i_end = self.Neq_per_node*(node+1) if i_end > Neq: i_end = Neq # block indexes, owned block indexes range self.mpi_node_block_indexes_map[node] = (set(), (i_start, i_end)) self._generateRuntimeInformation(self.analyzer.runtimeInformation) # MPI self._generateMPICommunication() cxx_dir = os.path.join(os.path.dirname(__file__), 'cxx') path, dirName = os.path.split(directory) for ni, buff in self.mpi_sync_buffers.items(): filename = os.path.join(directory, 'interprocess_comm_data-%05d.bin' % ni) f = open(filename, 'wb') f.write(buff) f.close() for ni, buff in self.rt_data_buffers.items(): filename = os.path.join(directory, 'runtime_data-%05d.bin' % ni) f = open(filename, 'wb') f.write(buff) f.close() for ni, (i_start, i_end, bi_to_bi_local) in self.compute_stack_files.items(): filenameCS = os.path.join(directory, 'model_equations-%05d.bin' % ni) filenameJI = os.path.join(directory, 'preconditioner_data-%05d.bin' % ni) self.simulation.ExportComputeStackStructs(filenameCS, filenameJI, i_start, i_end, bi_to_bi_local) """ shutil.copy2(os.path.join(cxx_dir, 'main.cpp'), os.path.join(directory, 'main.cpp')) shutil.copy2(os.path.join(cxx_dir, 'typedefs.h'), os.path.join(directory, 'typedefs.h')) shutil.copy2(os.path.join(cxx_dir, 'daesolver.h'), os.path.join(directory, 'daesolver.h')) shutil.copy2(os.path.join(cxx_dir, 'daesolver.cpp'), os.path.join(directory, 'daesolver.cpp')) shutil.copy2(os.path.join(cxx_dir, 'simulation.h'), os.path.join(directory, 'simulation.h')) shutil.copy2(os.path.join(cxx_dir, 'simulation.cpp'), os.path.join(directory, 'simulation.cpp')) shutil.copy2(os.path.join(cxx_dir, 'auxiliary.h'), os.path.join(directory, 'auxiliary.h')) shutil.copy2(os.path.join(cxx_dir, 'auxiliary.cpp'), os.path.join(directory, 'auxiliary.cpp')) shutil.copy2(os.path.join(cxx_dir, 'Makefile-gcc'), os.path.join(directory, 'Makefile')) shutil.copy2(os.path.join(cxx_dir, 'daetools-simulation.vcxproj'), os.path.join(directory, 'daetools-simulation.vcxproj')) shutil.copy2(os.path.join(cxx_dir, 'qt_project.pro'), os.path.join(directory, 'daetools-simulation.pro')) shutil.copy2(os.path.join(cxx_dir, 'lasolver.h'), os.path.join(directory, 'lasolver.h')) shutil.copy2(os.path.join(cxx_dir, 'lasolver.cpp'), os.path.join(directory, 'lasolver.cpp')) shutil.copy2(os.path.join(cxx_dir, 'compute_stack_openmp.cpp'), os.path.join(directory, 'compute_stack_openmp.cpp')) shutil.copy2(os.path.join(cxx_dir, 'compute_stack_openmp.h'), os.path.join(directory, 'compute_stack_openmp.h')) shutil.copy2(os.path.join(cxx_dir, 'compute_stack.h'), os.path.join(directory, 'compute_stack.h')) shutil.copy2(os.path.join(cxx_dir, 'model.h'), os.path.join(directory, 'model.h')) shutil.copy2(os.path.join(cxx_dir, 'model.cpp'), os.path.join(directory, 'model.cpp')) shutil.copy2(os.path.join(cxx_dir, 'runtime_information.h'), os.path.join(directory, 'runtime_information.h')) """ if len(self.warnings) > 0: print('CODE GENERATOR WARNINGS:') print(warnings)
# MPI def _generateMPICommunication(self): #pp = pprint.PrettyPrinter(indent=2) #pp.pprint(self.mpi_node_block_indexes_map) mpi_sync_map = {} for node_rank, node_data in self.mpi_node_block_indexes_map.items(): block_indexes = node_data[0] i_start = node_data[1][0] i_end = node_data[1][1] all_indexes = block_indexes # this is set! owned_indexes = [] foreign_indexes = [] bi_to_bi_local = {} to_send_indexes = {} to_receive_indexes = {} mpi_sync_map[node_rank] = { 'all_indexes' : all_indexes, 'i_start' : i_start, 'i_end' : i_end, 'owned_indexes' : owned_indexes, 'foreign_indexes' : foreign_indexes, 'bi_to_bi_local' : bi_to_bi_local, 'send_to' : to_send_indexes, 'receive_from' : to_receive_indexes} for bi in sorted(all_indexes): if bi >= i_start and bi < i_end: owned_indexes.append(bi) bi_to_bi_local[bi] = bi-i_start else: foreign_indexes.append(bi) bi_to_bi_local[bi] = (i_end-i_start) + len(foreign_indexes) - 1 #print bi_to_bi_local for node_rank, node_data in mpi_sync_map.items(): for bi in node_data['foreign_indexes']: owner_rank = int(bi / self.Neq_per_node) owner_map = mpi_sync_map[owner_rank]['send_to'] if node_rank in owner_map: owner_map[node_rank].append(bi) else: owner_map[node_rank] = [bi] this_map = mpi_sync_map[node_rank]['receive_from'] if owner_rank in this_map: this_map[owner_rank].append(bi) else: this_map[owner_rank] = [bi] foundMissing = False for ni, n_data in mpi_sync_map.items(): missing = [bi for bi in range(n_data['i_start'], n_data['i_end']) if not (bi in n_data['owned_indexes'])] if missing: print('Node [%d] does not contain indexes: %s' % (ni, missing)) foundMissing = True if foundMissing: raise RuntimeError('Missing block indexes found') # This is the point where mpi_sync_map is complete # and sync. data files can be saved. def packArray(arr): buff = struct.pack('i', len(arr)) fmt = '%di' % len(arr) buff += struct.pack(fmt, *arr) return buff for ni, n_data in mpi_sync_map.items(): buff = bytes() buff += struct.pack('i', n_data['i_start']) buff += struct.pack('i', n_data['i_end']) buff += packArray(n_data['foreign_indexes']) dict_items = n_data['bi_to_bi_local'].items() buff += struct.pack('i', len(dict_items)) for bi,bi_local in dict_items: buff += struct.pack('ii', bi, bi_local) dict_items = n_data['send_to'].items() buff += struct.pack('i', len(dict_items)) for sti, st_data in dict_items: # Node index buff += struct.pack('i', sti) # Length and array of indexes buff += packArray(st_data) dict_items = n_data['receive_from'].items() buff += struct.pack('i', len(dict_items)) for rfi, rf_data in dict_items: # Node index buff += struct.pack('i', rfi) # Length and array of indexes buff += packArray(rf_data) self.compute_stack_files[ni] = (n_data['i_start'], n_data['i_end'], n_data['bi_to_bi_local']) self.mpi_sync_buffers[ni] = buff """ Working printout!!! for ni, n_data in mpi_sync_map.items(): print('[Node %d]:' % ni) print(' index_range: [%d - %d)' % (n_data['i_start'], n_data['i_end'])) print(' all_indexes:') print(' %s' % sorted(n_data['all_indexes'])) print(' owned_indexes:') print(' %s' % n_data['owned_indexes']) print(' foreign_indexes:') print(' %s' % n_data['foreign_indexes']) print(' send_to:') for sti, st_data in n_data['send_to'].items(): print(' %d: %s' % (sti, st_data)) print(' receive_from:') for rfi, rf_data in n_data['receive_from'].items(): print(' %d: %s' % (rfi, rf_data)) """ def _generateRuntimeInformation(self, runtimeInformation): Ntotal = runtimeInformation['TotalNumberOfVariables'] Neq = runtimeInformation['NumberOfEquations'] IDs = runtimeInformation['IDs'] dofs = runtimeInformation['DOFs'] initValues = runtimeInformation['Values'] initDerivatives = runtimeInformation['TimeDerivatives'] indexMappings = runtimeInformation['IndexMappings'] absoluteTolerances = runtimeInformation['AbsoluteTolerances'] self.variableNames = Neq * [''] blockIDs = Neq * [-1] blockInitValues = Neq * [-1] absTolerances = Neq * [1E-5] blockInitDerivatives = Neq * [0.0] for oi, bi in list(indexMappings.items()): if IDs[oi] == cnAlgebraic: blockIDs[bi] = cnAlgebraic elif IDs[oi] == cnDifferential: blockIDs[bi] = cnDifferential if IDs[oi] != cnAssigned: blockInitValues[bi] = initValues[oi] blockInitDerivatives[bi] = initDerivatives[oi] absTolerances[bi] = absoluteTolerances[oi] for variable in runtimeInformation['Variables']: relativeName = daeGetRelativeName(self.wrapperInstanceName, variable['CanonicalName']) formattedName = self.exprFormatter.formatIdentifier(relativeName) name = self.exprFormatter.flattenIdentifier(formattedName) numberOfPoints = variable['NumberOfPoints'] units = ('-' if (variable['Units'] == unit()) else str(variable['Units'])) if numberOfPoints == 1: ID = int(variable['IDs']) # cnDifferential, cnAssigned or cnAlgebraic value = float(variable['Values']) # numpy float overallIndex = variable['OverallIndex'] fullName = relativeName blockIndex = None nodeIndex = None if ID != cnAssigned: blockIndex = indexMappings[overallIndex] + self.exprFormatter.indexBase nodeIndex = blockIndex / self.Nnodes self.variableNames[blockIndex] = fullName else: domainsIndexesMap = variable['DomainsIndexesMap'] for i in range(0, numberOfPoints): domIndexes = tuple(domainsIndexesMap[i]) # list of integers ID = int(variable['IDs'][domIndexes]) # cnDifferential, cnAssigned or cnAlgebraic value = float(variable['Values'][domIndexes]) # numpy float overallIndex = variable['OverallIndex'] + i fullName = relativeName + '(' + ','.join(str(di) for di in domIndexes) + ')' blockIndex = None if ID != cnAssigned: blockIndex = indexMappings[overallIndex] + self.exprFormatter.indexBase self.variableNames[blockIndex] = fullName indent = 1 s_indent = indent * self.defaultIndent # This is the point where mpi_node_block_indexes_map is complete # and sync. data files can be saved. def packIntegerArray(arr): buff = struct.pack('i', len(arr)) fmt = '%di' % len(arr) buff += struct.pack(fmt, *arr) return buff def packDoubleArray(arr): buff = struct.pack('i', len(arr)) fmt = '%dd' % len(arr) buff += struct.pack(fmt, *arr) return buff def packStringArray(arr): buff = struct.pack('i', len(arr)) for item in arr: bytes_item = str.encode(item, encoding = 'utf-8') # or ascii buff += struct.pack('i', len(bytes_item)) fmt = '%ds' % len(bytes_item) buff += struct.pack(fmt, bytes_item) return buff for node_rank, node_data in self.mpi_node_block_indexes_map.items(): i_start = node_data[1][0] i_end = node_data[1][1] init_values = blockInitValues[i_start:i_end] init_derivatives = blockInitDerivatives[i_start:i_end] absolute_tolerances = absTolerances[i_start:i_end] ids = blockIDs[i_start:i_end] variable_names = self.variableNames[i_start:i_end] #print(variable_names) buff = bytes() buff += struct.pack('i', i_start) buff += struct.pack('i', i_end) buff += struct.pack('i', Ntotal) buff += struct.pack('i', Neq) buff += struct.pack('i', i_end - i_start) buff += struct.pack('i', len(dofs)) buff += struct.pack('d', 0.0) buff += struct.pack('d', runtimeInformation['TimeHorizon']) buff += struct.pack('d', runtimeInformation['ReportingInterval']) buff += struct.pack('d', runtimeInformation['RelativeTolerance']) buff += struct.pack('?', runtimeInformation['QuasiSteadyState']) buff += packDoubleArray(dofs) buff += packDoubleArray(init_values) buff += packDoubleArray(init_derivatives) buff += packDoubleArray(absolute_tolerances) buff += packIntegerArray(ids) buff += packStringArray(variable_names) self.rt_data_buffers[node_rank] = buff # First generate residuals for equations and port connections self.equationGenerationMode = 'residuals' for port_connection in runtimeInformation['PortConnections']: self._processEquations(port_connection['Equations'], indent) self._processEquations(runtimeInformation['Equations'], indent) # Finally generate together residuals and jacobians for STNs # _processSTNs will take care of self.equationGenerationMode regime self._processSTNs(runtimeInformation['STNs'], indent) def _processEquations(self, Equations, indent): s_indent = indent * self.defaultIndent s_indent2 = (indent+1) * self.defaultIndent map_oi_bi = self.exprFormatter.indexMap current_node = 0 node_eqn_counter = 0 if self.equationGenerationMode == 'residuals': current_node_residual = [] self.residuals.append(current_node_residual) for equation in Equations: for eeinfo in equation['EquationExecutionInfos']: # MPI overall_indexes = eeinfo['VariableIndexes'] n = len(overall_indexes) block_indexes = [] for oi in overall_indexes: bi = self.exprFormatter.indexBase + map_oi_bi[oi] block_indexes.append(bi) self.mpi_node_block_indexes_map[current_node][0].update(set(block_indexes)) res = self.exprFormatter.formatRuntimeNode(eeinfo['ResidualRuntimeNode']) current_node_residual.append(s_indent + '/* Equation type: ' + eeinfo['EquationType'] + ' */') current_node_residual.append(s_indent + '_temp_ = {0};'.format(res)) current_node_residual.append(s_indent + '_residuals_[_ec_++] = _temp_.getValue();') # MPI node_eqn_counter += 1 # Reset the counter if node_eqn_counter == self.Neq_per_node: current_node_residual = [] self.residuals.append(current_node_residual) current_node += 1 node_eqn_counter = 0 def _processSTNs(self, STNs, indent): if len(STNs) > 0: raise RuntimeError('c++(MPI) code generator cannot handle models with state transition networks at the moment') def _processActions(self, Actions, indent): pass