Source code for pacman.operations.router_compressors.routing_compression_checker
# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, List, Optional, TextIO
from spinn_utilities.log import FormatAdapter
from spinn_machine import MulticastRoutingEntry
from pacman.exceptions import PacmanRoutingException
from pacman.model.routing_tables import AbstractMulticastRoutingTable
from pacman.utilities.algorithm_utilities.routes_format import format_route
logger = FormatAdapter(logging.getLogger(__name__))
WILDCARD = "*"
LINE_FORMAT = "0x{:08X} 0x{:08X} 0x{:08X} {: <7s} {}\n"
[docs]
def codify(route: MulticastRoutingEntry, length: int = 32) -> str:
"""
This method discovers all the routing keys covered by this route.
Starts of with the assumption that the key is always covered.
Whenever a mask bit is zero the list of covered keys is doubled to
include both the key with a zero and a one at that place.
:param ~spinn_machine.MulticastRoutingEntry route: single routing entry
:param int length: length in bits of the key and mask (defaults to 32)
:return: set of routing_keys covered by this route
:rtype: str
"""
mask = route.mask
key = route.key
# Check for validity: key 1 with mask 0 is an error
bad = key & ~mask
if bad:
logger.error(
"Bit {} on the mask:{} is 0 but 1 in the key:{}",
# Magic to find first set bit: See
# https://stackoverflow.com/a/36059264/301832
(bad & -bad).bit_length(), bin(mask), bin(key))
# Check each bit in the mask; use bit from key if so, else WILDCARD
return "".join(
str(int(key & bit != 0) if (mask & bit) else WILDCARD)
for bit in map(lambda i: 1 << i, reversed(range(length))))
[docs]
def codify_table(
table: AbstractMulticastRoutingTable, length: int = 32) -> Dict[
str, MulticastRoutingEntry]:
"""
Apply :py:func:`codify` to all entries in a table.
:param AbstractMulticastRoutingTable table:
:param int length:
:return: mapping from codified route to routing entry
:rtype: dict(str, ~spinn_machine.MulticastRoutingEntry)
"""
return {
codify(route, length): route
for route in table.multicast_routing_entries}
[docs]
def covers(o_code: str, c_code: str) -> bool:
"""
:param str o_code:
:param str c_code:
:rtype: bool
"""
if o_code == c_code:
return True
for o_char, c_char in zip(o_code, c_code):
if o_char == "1" and c_char == "0":
return False
if o_char == "0" and c_char == "1":
return False
# o_char = c_char or either wildcard is some cover
return True
[docs]
def calc_remainders(o_code: str, c_code: str) -> List[str]:
"""
:param str o_code: Codified original route
:param str c_code: Codified compressed route
:rtype: list(str)
"""
if o_code == c_code:
# "" = "" so also the terminator case
return []
remainders = []
for tail in calc_remainders(o_code[1:], c_code[1:]):
remainders.append(o_code[0] + tail)
if o_code[0] == WILDCARD:
if c_code[0] == "0":
remainders.append("1" + o_code[1:])
if c_code[0] == "1":
remainders.append("0" + o_code[1:])
return remainders
[docs]
def compare_route(
o_route: MulticastRoutingEntry,
compressed_dict: Dict[str, MulticastRoutingEntry],
o_code: Optional[str] = None, start: int = 0,
f: Optional[TextIO] = None) -> None:
"""
:param ~spinn_machine.MulticastRoutingEntry o_route: the original route
:param dict(str, ~spinn_machine.MulticastRoutingEntry) compressed_dict:
Compressed routes
:param str o_code: Codified original route (if known)
:param int start: Starting index in compressed routes
:param ~io.FileIO f: Where to write (part of) the route report
"""
if o_code is None:
o_code = codify(o_route)
keys = list(compressed_dict.keys())
for i in range(start, len(keys)):
c_code = keys[i]
if covers(o_code, c_code):
c_route = compressed_dict[c_code]
if f is not None:
f.write(f"\t\t{format_route(c_route)}\n")
if o_route.processor_ids != c_route.processor_ids:
raise PacmanRoutingException(
f"Compressed route {c_route} covers original route "
f"{o_route} but has a different processor_ids.")
if o_route.link_ids != c_route.link_ids:
raise PacmanRoutingException(
f"Compressed route {c_route} covers original route "
f"{o_route} but has a different link_ids.")
if not o_route.defaultable and c_route.defaultable:
if o_route == c_route:
raise PacmanRoutingException(
f"Compressed route {c_route} while original route "
f"{o_route} but has a different defaultable value.")
compare_route(o_route, compressed_dict, o_code=o_code,
start=i + 1, f=f)
else:
remainders = calc_remainders(o_code, c_code)
for remainder in remainders:
compare_route(o_route, compressed_dict, o_code=remainder,
start=i + 1, f=f)
return
if not o_route.defaultable:
# print(f"No route found {o_route}")
raise PacmanRoutingException(f"No route found {o_route}")
[docs]
def compare_tables(
original: AbstractMulticastRoutingTable,
compressed: AbstractMulticastRoutingTable) -> None:
"""
Compares the two tables without generating any output.
:param UnCompressedMulticastRoutingTable original:
The original routing tables
:param CompressedMulticastRoutingTable compressed:
The compressed routing tables.
Which will be considered in order.
:raises: PacmanRoutingException if there is any error
"""
compressed_dict = codify_table(compressed)
for o_route in original.multicast_routing_entries:
compare_route(o_route, compressed_dict)