Source code for pacman.operations.multi_cast_router_check_functionality.valid_routes_checker

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

""" Collection of functions which together validate routes.
"""
from collections import namedtuple
import logging
from spinn_utilities.ordered_set import OrderedSet
from spinn_utilities.progress_bar import ProgressBar
from spinn_utilities.log import FormatAdapter
from pacman.exceptions import PacmanRoutingException
from pacman.model.constraints.key_allocator_constraints import (
    ContiguousKeyRangeContraint)
from pacman.model.graphs.common import EdgeTrafficType
from pacman.utilities import utility_calls

logger = FormatAdapter(logging.getLogger(__name__))
_32_BITS = 0xFFFFFFFF
range_masks = {_32_BITS - ((2 ** i) - 1) for i in range(33)}

# Define an internal class for placements
PlacementTuple = namedtuple('PlacementTuple', 'x y p')


[docs]def validate_routes(machine_graph, placements, routing_infos, routing_tables, machine, graph_mapper=None): """ Go though the placements given and check that the routing entries\ within the routing tables support reach the correction destinations\ as well as not producing any cycles. :param machine_graph: the graph :param placements: the placements container :param routing_infos: the routing info container :param routing_tables: \ the routing tables generated by the routing algorithm :param graph_mapper: \ the mapping between graphs or none if only using a machine graph :param machine: the python machine object :type machine: spinn_machine.Machine object :rtype: None :raises PacmanRoutingException: when either no routing table entry is\ found by the search on a given router, or a cycle is detected """ traffic_multicast = ( lambda edge: edge.traffic_type == EdgeTrafficType.MULTICAST) progress = ProgressBar( placements.placements, "Verifying the routes from each core travel to the correct locations") for placement in progress.over(placements.placements): # locate all placements to which this placement/vertex will # communicate with for a given key_and_mask and search its # determined destinations # gather keys and masks per partition partitions = machine_graph.\ get_outgoing_edge_partitions_starting_at_vertex(placement.vertex) if graph_mapper is not None: n_atoms = graph_mapper.get_slice(placement.vertex).n_atoms else: n_atoms = 0 for partition in partitions: r_info = routing_infos.get_routing_info_from_partition( partition) is_continuous = _check_if_partition_has_continuous_keys(partition) if not is_continuous: logger.warning( "Due to the none continuous nature of the keys in this " "partition {}, we cannot check all atoms will be routed " "correctly, but will check the base key instead", partition) destination_placements = OrderedSet() # filter for just multicast edges, we don't check other types of # edges here. out_going_edges = filter(traffic_multicast, partition.edges) # for every outgoing edge, locate its destination and store it. for outgoing_edge in out_going_edges: dest_placement = placements.get_placement_of_vertex( outgoing_edge.post_vertex) destination_placements.add( PlacementTuple(x=dest_placement.x, y=dest_placement.y, p=dest_placement.p)) # search for these destinations for key_and_mask in r_info.keys_and_masks: _search_route( placement, destination_placements, key_and_mask, routing_tables, machine, n_atoms, is_continuous)
def _check_if_partition_has_continuous_keys(partition): continuous_constraints = utility_calls.locate_constraints_of_type( partition.constraints, ContiguousKeyRangeContraint) # TODO: Can we do better here? return len(continuous_constraints) > 0 def _search_route(source_placement, dest_placements, key_and_mask, routing_tables, machine, n_atoms, is_continuous): """ Locate if the routing tables work for the source to desks as\ defined :param source_placement: the placement from which the search started :param dest_placements: \ the placements to which this trace should visit only once :param key_and_mask: the key and mask associated with this set of edges :param n_atoms: the number of atoms going through this path :param is_continuous: \ whether the keys and atoms mapping is continuous :type source_placement: \ :py:class:`pacman.model.placements.Placement` :type dest_placements: iterable(PlacementTuple) :type key_and_mask: \ :py:class:`pacman.model.routing_info.BaseKeyAndMask` :rtype: None :raise PacmanRoutingException: when the trace completes and there are\ still destinations not visited """ if logger.isEnabledFor(logging.DEBUG): for dest in dest_placements: logger.debug("[{}:{}:{}]", dest.x, dest.y, dest.p) located_destinations = set() failed_to_cover_all_keys_routers = list() _start_trace_via_routing_tables( source_placement, key_and_mask, located_destinations, routing_tables, machine, n_atoms, is_continuous, failed_to_cover_all_keys_routers) # start removing from located_destinations and check if destinations not # reached failed_to_reach_destinations = list() for dest in dest_placements: if dest in located_destinations: located_destinations.remove(dest) else: failed_to_reach_destinations.append(dest) # check for error if trace didn't reach a destination it was meant to error_message = "" if failed_to_reach_destinations: output_string = "" for dest in failed_to_reach_destinations: output_string += "[{}:{}:{}]".format(dest.x, dest.y, dest.p) source_processor = "[{}:{}:{}]".format( source_placement.x, source_placement.y, source_placement.p) error_message += ("failed to locate all destinations with vertex" " {} on processor {} with keys {} as it did not " "reach destinations {}".format( source_placement.vertex.label, source_processor, key_and_mask, output_string)) # check for error if the trace went to a destination it shouldn't have if located_destinations: output_string = "" for dest in located_destinations: output_string += "[{}:{}:{}]".format(dest.x, dest.y, dest.p) source_processor = "[{}:{}:{}]".format( source_placement.x, source_placement.y, source_placement.p) error_message += ("trace went to more failed to locate all " "destinations with vertex {} on processor {} " "with keys {} as it didn't reach destinations {}" .format( source_placement.vertex.label, source_processor, key_and_mask, output_string)) if failed_to_cover_all_keys_routers: output_string = "" for data_entry in failed_to_cover_all_keys_routers: output_string += "[{}, {}, {}, {}]".format( data_entry['router_x'], data_entry['router_y'], data_entry['keys'], data_entry['source_mask']) source_processor = "[{}:{}:{}]".format( source_placement.x, source_placement.y, source_placement.p) error_message += ( "trace detected that there were atoms which the routing entry's" " wont cover and therefore packets will fly off to unknown places." " These keys came from the vertex {} on processor {} and the" " failed routers are {}".format( source_placement.vertex.label, source_processor, output_string)) # raise error if required if error_message != "": raise PacmanRoutingException(error_message) logger.debug("successful test between {} and {}", source_placement.vertex.label, dest_placements) def _start_trace_via_routing_tables( source_placement, key_and_mask, reached_placements, routing_tables, machine, n_atoms, is_continuous, failed_to_cover_all_keys_routers): """ Start the trace, by using the source placement's router and tracing\ from the route. :param source_placement: the source placement used by the trace :param key_and_mask: the key being used by the vertex which\ resides on the source placement :param reached_placements: the placements reached during the trace :param n_atoms: the number of atoms going through this path :param is_continuous: \ bool stating if the keys and atoms mapping is continuous :param failed_to_cover_all_keys_routers: \ list of failed routers for all keys :rtype: None :raises None: this method does not raise any known exception """ current_router_table = routing_tables.get_routing_table_for_chip( source_placement.x, source_placement.y) visited_routers = set() visited_routers.add((current_router_table.x, current_router_table.y)) # get src router entry = _locate_routing_entry( current_router_table, key_and_mask.key, n_atoms) _recursive_trace_to_destinations( entry, current_router_table, source_placement.x, source_placement.y, key_and_mask, visited_routers, reached_placements, machine, routing_tables, is_continuous, n_atoms, failed_to_cover_all_keys_routers) def _check_all_keys_hit_entry(entry, n_atoms, base_key): """ :param entry: routing entry discovered :param n_atoms: the number of atoms this partition covers :param base_key: the base key of the partition :return: the list of keys which this entry doesn't cover which it should """ bad_entries = list() for atom_id in range(0, n_atoms): key = base_key + atom_id if entry.mask & key != entry.routing_entry_key: bad_entries.append(key) return bad_entries # locates the next dest position to check def _recursive_trace_to_destinations( entry, current_router, chip_x, chip_y, key_and_mask, visited_routers, reached_placements, machine, routing_tables, is_continuous, n_atoms, failed_to_cover_all_keys_routers): """ Recursively search though routing tables until no more entries are\ registered with this key. :param entry: the original entry used by the first router which\ resides on the source placement chip. :param current_router: \ the router currently being visited during the trace :param key_and_mask: the key and mask being used by the vertex\ which resides on the source placement :param visited_routers: the list of routers which have been visited\ during this trace so far :param reached_placements: the placements reached during the trace :param chip_x: the x coordinate of the chip being considered :param chip_y: the y coordinate of the chip being considered :param n_atoms: the number of atoms going through this path :param is_continuous: \ bool stating if the keys and atoms mapping is continuous :param failed_to_cover_all_keys_routers: \ list of failed routers for all keys :type entry: \ :py:class:`spinn_machine.MulticastRoutingEntry` :type current_router:\ :py:class:`pacman.model.routing_tables.MulticastRoutingTable` :type chip_x: int :type chip_y: int :type key_and_mask: :py:class:`pacman.model.routing_info.BaseKeyAndMask` :type visited_routers: \ iterable(:py:class:`pacman.model.routing_tables.MulticastRoutingTable`) :type reached_placements: iterable(PlacementTuple) :rtype: None :raise None: this method does not raise any known exceptions """ # determine where the route takes us chip_links = entry.link_ids processor_values = entry.processor_ids # if goes down a chip link if chip_links: # also goes to a processor if processor_values: _is_dest(processor_values, current_router, reached_placements) # only goes to new chip for link_id in chip_links: # locate next chips router machine_router = machine.get_chip_at(chip_x, chip_y).router link = machine_router.get_link(link_id) next_router = routing_tables.get_routing_table_for_chip( link.destination_x, link.destination_y) # check that we've not visited this router before _check_visited_routers( next_router.x, next_router.y, visited_routers) # locate next entry entry = _locate_routing_entry( next_router, key_and_mask.key, n_atoms) if is_continuous: bad_entries = _check_all_keys_hit_entry( entry, n_atoms, key_and_mask.key) if bad_entries: failed_to_cover_all_keys_routers.append( {'router_x': next_router.x, 'router_y': next_router.y, 'keys': bad_entries, 'source_mask': key_and_mask.mask}) # get next route value from the new router _recursive_trace_to_destinations( entry, next_router, link.destination_x, link.destination_y, key_and_mask, visited_routers, reached_placements, machine, routing_tables, is_continuous, n_atoms, failed_to_cover_all_keys_routers) # only goes to a processor elif processor_values: _is_dest(processor_values, current_router, reached_placements) def _check_visited_routers(chip_x, chip_y, visited_routers): """ Check if the trace has visited this router already :param chip_x: the x coordinate of the chip being checked :param chip_y: the y coordinate of the chip being checked :param visited_routers: routers already visited :type chip_x: int :type chip_y: int :type visited_routers: iterable of\ :py:class:`pacman.model.routing_tables.MulticastRoutingTable` :rtype: None :raise PacmanRoutingException: when a router has been visited twice. """ visited_routers_router = (chip_x, chip_y) if visited_routers_router in visited_routers: raise PacmanRoutingException( "visited this router before, there is a cycle here. " "The routers I've currently visited are {} and the router i'm " "visiting is {}" .format(visited_routers, visited_routers_router)) visited_routers.add(visited_routers_router) def _is_dest(processor_ids, current_router, reached_placements): """ Check for processors to be removed :param reached_placements: the placements to which the trace visited :param processor_ids: the processor IDs which the last router entry\ said the trace should visit :param current_router: the current router being used in the trace :rtype: None :raise None: this method does not raise any known exceptions """ dest_x, dest_y = current_router.x, current_router.y for processor_id in processor_ids: reached_placements.add(PlacementTuple(dest_x, dest_y, processor_id)) def _locate_routing_entry(current_router, key, n_atoms): """ Locate the entry from the router based off the edge :param current_router: the current router being used in the trace :param key: the key being used by the source placement :rtype: None :raise PacmanRoutingException: \ when there is no entry located on this router """ found_entry = None for entry in current_router.multicast_routing_entries: key_combo = entry.mask & key e_key = entry.routing_entry_key if key_combo == e_key: if found_entry is None: found_entry = entry else: logger.warning( "Found more than one entry for key {}. This could be " "an error, as currently no router supports overloading" " of entries.", hex(key)) if entry.mask in range_masks: last_atom = key + n_atoms - 1 last_key = e_key + (~entry.mask & _32_BITS) if last_key < last_atom: raise PacmanRoutingException( "Full key range not covered: key:{} key_combo:{} " "mask:{}, last_key:{}, e_key:{}".format( hex(key), hex(key_combo), hex(entry.mask), hex(last_key), hex(e_key))) elif entry.mask in range_masks: last_atom = key + n_atoms last_key = e_key + (~entry.mask & _32_BITS) if min(last_key, last_atom) - max(e_key, key) + 1 > 0: raise Exception( "Key range partially covered: key:{} key_combo:{} " "mask:{}, last_key:{}, e_key:{}".format( hex(key), hex(key_combo), hex(entry.mask), hex(last_key), hex(e_key))) if found_entry is None: raise PacmanRoutingException("no entry located") return found_entry