Source code for pacman.model.routing_tables.multicast_routing_tables

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import json
import gzip
from collections import OrderedDict
from pacman.exceptions import PacmanAlreadyExistsException
from .multicast_routing_table import MulticastRoutingTable
from spinn_machine import MulticastRoutingEntry


[docs]class MulticastRoutingTables(object): """ Represents the multicast routing tables for a number of chips. """ __slots__ = [ # set that holds routing tables "_routing_tables", # dict of (x,y) -> routing table "_routing_tables_by_chip" ] def __init__(self, routing_tables=None): """ :param routing_tables: The routing tables to add :type routing_tables: \ iterable(:py:class:`pacman.model.routing_tables.MulticastRoutingTable`) :raise pacman.exceptions.PacmanAlreadyExistsException: \ If any two routing tables are for the same chip """ self._routing_tables = set() self._routing_tables_by_chip = dict() if routing_tables is not None: for routing_table in routing_tables: self.add_routing_table(routing_table)
[docs] def add_routing_table(self, routing_table): """ Add a routing table :param routing_table: a routing table to add :type routing_table:\ :py:class:`pacman.model.routing_tables.MulticastRoutingTable` :rtype: None :raise pacman.exceptions.PacmanAlreadyExistsException: \ If a routing table already exists for the chip """ if routing_table in self._routing_tables: raise PacmanAlreadyExistsException( "The Routing table {} has already been added to the collection" " before and therefore already exists".format(routing_table), str(routing_table)) if (routing_table.x, routing_table.y) in self._routing_tables_by_chip: raise PacmanAlreadyExistsException( "The Routing table for chip {}:{} already exists in this " "collection and therefore is deemed an error to re-add it" .format(routing_table.x, routing_table.y), str(routing_table)) self._routing_tables_by_chip[(routing_table.x, routing_table.y)] = \ routing_table self._routing_tables.add(routing_table)
@property def routing_tables(self): """ The routing tables stored within :return: an iterable of routing tables :rtype: \ iterable(:py:class:`pacman.model.routing_tables.MulticastRoutingTable`) :raise None: does not raise any known exceptions """ return self._routing_tables
[docs] def get_routing_table_for_chip(self, x, y): """ Get a routing table for a particular chip :param x: The x-coordinate of the chip :type x: int :param y: The y-coordinate of the chip :type y: int :return: The routing table, or None if no such table exists :rtype:\ :py:class:`pacman.model.routing_tables.MulticastRoutingTable`\ or None :raise None: No known exceptions are raised """ return self._routing_tables_by_chip.get((x, y), None)
def __iter__(self): """ Iterator for the multicast routing tables stored within :return: iterator of multicast_routing_table """ return iter(self._routing_tables)
[docs]def to_json(router_table): json_list = [] for routing_table in router_table: json_routing_table = OrderedDict() json_routing_table["x"] = routing_table.x json_routing_table["y"] = routing_table.y entries = [] for entry in routing_table.multicast_routing_entries: json_entry = OrderedDict() json_entry["key"] = entry.routing_entry_key json_entry["mask"] = entry.mask json_entry["defaultable"] = entry.defaultable json_entry["spinnaker_route"] = entry.spinnaker_route entries.append(json_entry) json_routing_table["entries"] = entries json_list.append(json_routing_table) return json_list
[docs]def from_json(j_router): if isinstance(j_router, str): if j_router.endswith(".gz"): with gzip.open(j_router) as j_file: j_router = json.load(j_file) else: with open(j_router) as j_file: j_router = json.load(j_file) tables = MulticastRoutingTables() for j_table in j_router: table = MulticastRoutingTable(j_table["x"], j_table["y"]) tables.add_routing_table(table) for j_entry in j_table["entries"]: table.add_multicast_routing_entry(MulticastRoutingEntry( j_entry["key"], j_entry["mask"], defaultable=j_entry["defaultable"], spinnaker_route=j_entry["spinnaker_route"])) return tables