Source code for pacman.model.routing_tables.uncompressed_multicast_routing_table
# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
import gzip
import logging
from typing import Any, Collection, Dict, Iterable
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_machine import MulticastRoutingEntry, RoutingEntry
from pacman.exceptions import PacmanAlreadyExistsException
from pacman.model.routing_tables import AbstractMulticastRoutingTable
logger = FormatAdapter(logging.getLogger(__name__))
class UnCompressedMulticastRoutingTable(AbstractMulticastRoutingTable):
"""
Represents a uncompressed routing table for a chip.
Uncompressed tables have no maximum size, but may not be deployable to
hardware without being compressed.
"""
__slots__ = (
# The coordinates of the chip for which this is the routing table
"_x", "_y",
# dict of multicast routing entries.
# (key, mask) -> multicast_routing_entry
"_entries_by_key_mask",
# counter of how many entries in their multicast routing table are
# defaultable
"_number_of_defaulted_routing_entries")
def __init__(
self, x: int, y: int,
multicast_routing_entries: Iterable[MulticastRoutingEntry] = ()):
"""
:param int x:
The x-coordinate of the chip for which this is the routing table
:param int y:
The y-coordinate of the chip for which this is the routing tables
:param multicast_routing_entries:
The routing entries to add to the table
:type multicast_routing_entries:
iterable(~spinn_machine.MulticastRoutingEntry)
:raise PacmanAlreadyExistsException:
If any two routing entries contain the same key-mask combination
"""
self._x = x
self._y = y
self._number_of_defaulted_routing_entries = 0
self._entries_by_key_mask: Dict = dict()
for multicast_routing_entry in multicast_routing_entries:
self.add_multicast_routing_entry(multicast_routing_entry)
[docs]
@overrides(AbstractMulticastRoutingTable.add_multicast_routing_entry)
def add_multicast_routing_entry(
self, multicast_routing_entry: MulticastRoutingEntry) -> None:
routing_entry_key = multicast_routing_entry.key
mask = multicast_routing_entry.mask
tuple_key = (routing_entry_key, mask)
if tuple_key in self._entries_by_key_mask:
# Only fail if they don't go to the same place
if self._entries_by_key_mask[tuple_key] == multicast_routing_entry:
return
raise PacmanAlreadyExistsException(
f"Multicast_routing_entry {tuple_key}: "
f"{self._entries_by_key_mask[tuple_key]} on "
f"{self._x}, {self._y}",
str(multicast_routing_entry))
self._entries_by_key_mask[tuple_key] = multicast_routing_entry
# update default routed counter if required
if multicast_routing_entry.defaultable:
self._number_of_defaulted_routing_entries += 1
@property
@overrides(AbstractMulticastRoutingTable.x)
def x(self) -> int:
return self._x
@property
@overrides(AbstractMulticastRoutingTable.y)
def y(self) -> int:
return self._y
@property
@overrides(AbstractMulticastRoutingTable.multicast_routing_entries)
def multicast_routing_entries(self) -> Collection[MulticastRoutingEntry]:
return self._entries_by_key_mask.values()
@property
@overrides(AbstractMulticastRoutingTable.number_of_entries)
def number_of_entries(self) -> int:
return len(self._entries_by_key_mask)
@property
@overrides(AbstractMulticastRoutingTable.number_of_defaultable_entries)
def number_of_defaultable_entries(self) -> int:
return self._number_of_defaulted_routing_entries
@overrides(AbstractMulticastRoutingTable.__eq__)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, UnCompressedMulticastRoutingTable):
return False
if self._x != other.x and self._y != other.y:
return False
return self._entries_by_key_mask == other._entries_by_key_mask
@overrides(AbstractMulticastRoutingTable.__hash__)
def __hash__(self) -> int:
return id(self)
def _from_csv_file(
csvfile: Iterable[str]) -> UnCompressedMulticastRoutingTable:
table_reader = csv.reader(csvfile)
table = UnCompressedMulticastRoutingTable(0, 0)
for row in table_reader:
try:
if len(row) == 3:
key = int(row[0], base=16)
mask = int(row[1], base=16)
route = int(row[2], base=16)
entry = RoutingEntry(spinnaker_route=route)
table.add_multicast_routing_entry(
MulticastRoutingEntry(key, mask, entry))
elif len(row) == 6:
key = int(row[1], base=16)
mask = int(row[2], base=16)
route = int(row[3], base=16)
entry = RoutingEntry(spinnaker_route=route)
table.add_multicast_routing_entry(
MulticastRoutingEntry(key, mask, entry))
except ValueError as ex:
logger.warning(f"csv read error {ex}")
return table
def from_csv(file_name: str) -> UnCompressedMulticastRoutingTable:
"""
Reads a comma separated file into Routing Tables
:param str file_name:
:rtype: UnCompressedMulticastRoutingTable:
"""
if file_name.endswith(".gz"):
with gzip.open(file_name, mode="rt", newline='') as csvfile:
return _from_csv_file(csvfile)
else:
with open(file_name, newline='', encoding="utf-8") as csvfile:
return _from_csv_file(csvfile)