# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from collections import defaultdict
from six import iterkeys
from spinn_utilities.log import FormatAdapter
from spinn_utilities.timer import Timer
from pacman.exceptions import PacmanConfigurationException
from pacman import operations
from .injection_decorator import injection_context, do_injection
from .algorithm_decorators import (
scan_packages, get_algorithms, Token)
from .algorithm_metadata_xml_reader import AlgorithmMetadataXmlReader
from pacman.operations import algorithm_reports
from pacman.utilities import file_format_converters
from pacman.executor.token_states import TokenStates
logger = FormatAdapter(logging.getLogger(__name__))
[docs]class PACMANAlgorithmExecutor(object):
""" An executor of PACMAN algorithms where the order is deduced from the\
input and outputs of the algorithm using an XML description of the\
algorithm.
"""
__slots__ = [
# The timing of algorithms
"_algorithm_timings",
# The algorithms to run
"_algorithms",
# The inputs passed in from the user
"_inputs",
# The completed output tokens
"_completed_tokens",
# The type mapping as things flow from input to output
"_internal_type_mapping",
# True if timing is to be done
"_do_timing",
# True if timing is to be printed
"_print_timings",
# True if injection is to be done during the run
"_do_immediate_injection",
# True if injection is to be done after the run
"_do_post_run_injection",
# True if the inputs are to be injected
"_inject_inputs",
# True if direct injection is to be done
"_do_direct_injection",
# the flag in the provenance area.
"_provenance_name",
# If required a file path to append provenance data to
"_provenance_path"
]
def __init__(
self, algorithms, optional_algorithms, inputs, required_outputs,
tokens, required_output_tokens, xml_paths=None, packages=None,
do_timings=True, print_timings=False, do_immediate_injection=True,
do_post_run_injection=False, inject_inputs=True,
do_direct_injection=True, use_unscanned_annotated_algorithms=True,
provenance_path=None, provenance_name=None):
"""
:param algorithms: A list of algorithms that must all be run
:param optional_algorithms:\
A list of algorithms that must be run if their inputs are available
:param inputs: A dict of input type to value
:param required_outputs: A list of output types that must be generated
:param tokens:\
A list of tokens that should be considered to have been generated
:param required_output_tokens:\
A list of tokens that should be generated by the end of the run
:param xml_paths:\
An optional list of paths to XML files containing algorithm\
descriptions; if not specified, only detected algorithms will be\
used (or else those found in packages)
:param packages:\
An optional list of packages to scan for decorated algorithms; if\
not specified, only detected algorithms will be used (or else\
those specified in packages
:param do_timings:\
True if timing information should be printed after each algorithm,\
False otherwise
:param do_immediate_injection:\
Perform injection with objects as they are created; can result in\
multiple calls to the same inject-annotated methods
:param do_post_run_injection:\
Perform injection at the end of the run. This will only set the\
last object of any type created.
:param inject_inputs:\
True if inputs should be injected; only active if one of\
do_immediate_injection or do_post_run_injection is True. These\
variables define when the injection of inputs is done; if\
immediate injection is True, injection of inputs is done at the\
start of the run, otherwise it is done at the end.
:param do_direct_injection:\
True if direct injection into methods should be supported. This\
will allow any of the inputs or generated outputs to be injected\
into a method
:param use_unscanned_annotated_algorithms:\
True if algorithms that have been detected outside of the packages\
argument specified above should be used
:param provenance_path:\
Path to file to append full provenance data to
If None no provenance is written
"""
# algorithm timing information
self._algorithm_timings = list()
# Store the completed tokens, initially empty
self._completed_tokens = None
# pacman mapping objects
self._algorithms = list()
self._inputs = inputs
# define mapping between types and internal values
self._internal_type_mapping = dict()
# store timing request
self._do_timing = do_timings
# print timings as you go
self._print_timings = print_timings
# injection
self._do_immediate_injection = do_immediate_injection
self._do_post_run_injection = do_post_run_injection
self._inject_inputs = inject_inputs
self._do_direct_injection = do_direct_injection
if provenance_name is None:
self._provenance_name = "mapping"
else:
self._provenance_name = provenance_name
self._set_up_pacman_algorithm_listings(
algorithms, optional_algorithms, xml_paths,
packages, inputs, required_outputs,
use_unscanned_annotated_algorithms, tokens, required_output_tokens)
self._provenance_path = provenance_path
def _set_up_pacman_algorithm_listings(
self, algorithms, optional_algorithms, xml_paths, packages, inputs,
required_outputs, use_unscanned_algorithms, tokens,
required_output_tokens):
""" Translates the algorithm string and uses the config XML to create\
algorithm objects
:param algorithms: the string representation of the set of algorithms
:param inputs: list of input types
:type inputs: iterable(str)
:param optional_algorithms: list of algorithms which are optional\
and don't necessarily need to be ran to complete the logic flow
:type optional_algorithms: iterable(str)
:param xml_paths: the list of paths for XML configuration data
:type xml_paths: iterable(str)
:param required_outputs: \
the set of outputs that this workflow is meant to generate
:type required_outputs: iterable(str)
:param tokens:\
A list of tokens that should be considered to have been generated\
as a list of strings
:param required_output_tokens:\
A list of tokens that should be generated by the end of the run\
as a list of strings
"""
# deduce if the algorithms are internal or external
algorithms_names = list(algorithms)
# protect the variable from reference movement during usage
copy_of_xml_paths = []
if xml_paths is not None:
copy_of_xml_paths = list(xml_paths)
copy_of_packages = []
if packages is not None:
copy_of_packages = list(packages)
copy_of_optional_algorithms = []
if optional_algorithms is not None:
copy_of_optional_algorithms = list(optional_algorithms)
# set up XML reader for standard PACMAN algorithms XML file reader
# (used in decode_algorithm_data_objects function)
copy_of_xml_paths.append(operations.algorithms_metdata_file)
copy_of_xml_paths.append(algorithm_reports.reports_metadata_file)
# decode the algorithms specs
xml_decoder = AlgorithmMetadataXmlReader(copy_of_xml_paths)
algorithm_data_objects = xml_decoder.decode_algorithm_data_objects()
converter_xml_path = \
file_format_converters.converter_algorithms_metadata_file
converter_decoder = AlgorithmMetadataXmlReader([converter_xml_path])
converters = converter_decoder.decode_algorithm_data_objects()
# Scan for annotated algorithms
copy_of_packages.append(operations)
copy_of_packages.append(algorithm_reports)
converters.update(scan_packages([file_format_converters]))
algorithm_data_objects.update(scan_packages(copy_of_packages))
if use_unscanned_algorithms:
algorithm_data_objects.update(get_algorithms())
# get list of all xml's as this is used to exclude xml files from
# import
all_xml_paths = list()
all_xml_paths.extend(copy_of_xml_paths)
all_xml_paths.append(converter_xml_path)
# filter for just algorithms we want to use
algorithm_data = self._get_algorithm_data(
algorithms_names, algorithm_data_objects)
optional_algorithms_datas = self._get_algorithm_data(
copy_of_optional_algorithms, algorithm_data_objects)
converter_algorithms_datas = self._get_algorithm_data(
converters.keys(), converters)
# sort_out_order_of_algorithms for execution
self._determine_algorithm_order(
inputs, required_outputs, algorithm_data,
optional_algorithms_datas, converter_algorithms_datas,
tokens, required_output_tokens)
@staticmethod
def _get_algorithm_data(
algorithm_names, algorithm_data_objects):
algorithms = list()
for algorithm_name in algorithm_names:
if algorithm_name not in algorithm_data_objects:
raise PacmanConfigurationException(
"Cannot find algorithm {}".format(algorithm_name))
algorithms.append(algorithm_data_objects[algorithm_name])
return algorithms
def _determine_algorithm_order(
self, inputs, required_outputs, algorithm_data,
optional_algorithm_data, converter_algorithms_datas,
tokens, required_output_tokens):
""" Takes the algorithms and determines which order they need to be\
executed to generate the correct data objects
:param inputs: list of input types
:type inputs: iterable(str)
:param required_outputs: \
the set of outputs that this workflow is meant to generate
:param converter_algorithms_datas: the set of converter algorithms
:param optional_algorithm_data: the set of optional algorithms
:rtype: None
"""
# Go through the algorithms and get all possible outputs
all_outputs = set(iterkeys(inputs))
for algorithms in (algorithm_data, optional_algorithm_data):
for algorithm in algorithms:
# Get the algorithm output types
alg_outputs = {
output.output_type for output in algorithm.outputs}
# Remove from the outputs any optional input that is also an
# output
for alg_input in algorithm.optional_inputs:
for matching in alg_input.get_matching_inputs(alg_outputs):
alg_outputs.discard(matching)
all_outputs.update(alg_outputs)
# Set up the token tracking and make all specified tokens complete
token_states = TokenStates()
for token_name in tokens:
token = Token(token_name)
token_states.track_token(token)
token_states.process_output_token(token)
# Go through the algorithms and add in the tokens that can be completed
# by any of the algorithms
for algorithms in (algorithm_data, optional_algorithm_data):
for algorithm in algorithms:
for token in algorithm.generated_output_tokens:
if not token_states.is_token_complete(token):
token_states.track_token(token)
# Go through the algorithms and add a fake token for any algorithm that
# requires an optional token that can't be provided and a fake input
# for any algorithm that requires an optional input that can't be
# provided. This allows us to require the other optional inputs and
# tokens so that algorithms that provide those items are run before
# those that can make use of them.
fake_inputs = set()
fake_tokens = TokenStates()
for algorithms in (algorithm_data, optional_algorithm_data):
for algorithm in algorithms:
for input_parameter in algorithm.optional_inputs:
if not input_parameter.input_matches(all_outputs):
fake_inputs.update(
input_parameter.get_fake_inputs(all_outputs))
for token in algorithm.optional_input_tokens:
if (not token_states.is_tracking_token(token) and
not fake_tokens.is_token_complete(token)):
fake_tokens.track_token(token)
fake_tokens.process_output_token(token)
input_types = set(iterkeys(inputs))
allocated_algorithms = list()
generated_outputs = set()
generated_outputs.union(input_types)
algorithms_to_find = list(algorithm_data)
optionals_to_use = list(optional_algorithm_data)
outputs_to_find = self._remove_outputs_which_are_inputs(
required_outputs, inputs)
tokens_to_find = self._remove_complete_tokens(
token_states, required_output_tokens)
while algorithms_to_find or outputs_to_find or tokens_to_find:
suitable_algorithm = None
algorithm_list = None
# Order of searching - each combination will be attempted in order;
# the first matching algorithm will be used (and search will stop)
# Elements are:
# 1. Algorithm list to search,
# 2. check generated outputs,
# 3. require optional inputs)
order = [
# Check required algorithms forcing optional inputs
(algorithms_to_find, False, True),
# Check optional algorithms forcing optional inputs
(optionals_to_use, True, True),
# Check required algorithms without optional inputs
# - shouldn't need to do this, but might if an optional input
# is also a generated output of the same algorithm
(algorithms_to_find, False, False),
# Check optional algorithms without optional inputs
# - as above, it shouldn't be necessary but might be if an
# optional input is also an output of the same algorithm
(optionals_to_use, True, False),
# Check converter algorithms
# (only if they generate something new)
(converter_algorithms_datas, True, False)
]
for (algorithms, check_outputs, force_required) in order:
suitable_algorithm, algorithm_list = \
self._locate_suitable_algorithm(
algorithms, input_types, generated_outputs,
token_states, fake_inputs, fake_tokens,
check_outputs, force_required)
if suitable_algorithm is not None:
break
if suitable_algorithm is not None:
# Remove the value
self._remove_algorithm_and_update_outputs(
algorithm_list, suitable_algorithm, input_types,
generated_outputs, outputs_to_find)
# add the suitable algorithms to the list and take the outputs
# as new inputs
allocated_algorithms.append(suitable_algorithm)
# Mark any tokens generated as complete
for output_token in suitable_algorithm.generated_output_tokens:
token_states.process_output_token(output_token)
if token_states.is_token_complete(
Token(output_token.name)):
tokens_to_find.discard(output_token.name)
else:
# Failed to find an algorithm to run!
algorithms_to_find_names = list()
for algorithm in algorithms_to_find:
algorithms_to_find_names.append(algorithm.algorithm_id)
optional_algorithms_names = list()
for algorithm in optional_algorithm_data:
optional_algorithms_names.append(algorithm.algorithm_id)
algorithms_used = list()
for algorithm in allocated_algorithms:
algorithms_used.append(algorithm.algorithm_id)
algorithm_input_requirement_breakdown = ""
for algorithm in algorithms_to_find:
algorithm_input_requirement_breakdown += \
self._deduce_inputs_required_to_run(
algorithm, input_types, token_states,
fake_inputs, fake_tokens)
for algorithm in optionals_to_use:
algorithm_input_requirement_breakdown += \
self._deduce_inputs_required_to_run(
algorithm, input_types, token_states,
fake_inputs, fake_tokens)
algorithms_by_output = defaultdict(list)
algorithms_by_token = defaultdict(list)
for algorithms in (algorithm_data, optional_algorithm_data):
for algorithm in algorithms:
for output in algorithm.outputs:
algorithms_by_output[output.output_type].append(
algorithm.algorithm_id)
for token in algorithm.generated_output_tokens:
algorithms_by_token[token.name].append(
"{}: part={}".format(
algorithm.algorithm_id, token.part))
raise PacmanConfigurationException(
"Unable to deduce a future algorithm to use.\n"
" Inputs: {}\n"
" Fake Inputs: {}\n"
" Outputs to find: {}\n"
" Tokens complete: {}\n"
" Fake tokens complete: {}\n"
" Tokens to find: {}\n"
" Required algorithms remaining to be used: {}\n"
" Optional Algorithms unused: {}\n"
" Functions used: {}\n"
" Algorithm by outputs: {}\n"
" Algorithm by tokens: {}\n"
" Inputs required per function: \n{}\n".format(
sorted(input_types),
sorted(fake_inputs),
outputs_to_find,
token_states.get_completed_tokens(),
fake_tokens.get_completed_tokens(),
tokens_to_find,
algorithms_to_find_names,
optional_algorithms_names,
algorithms_used,
algorithms_by_output,
algorithms_by_token,
algorithm_input_requirement_breakdown))
# Test that the outputs are generated
all_required_outputs_generated = True
failed_to_generate_output_string = ""
for output in outputs_to_find:
if output not in generated_outputs:
all_required_outputs_generated = False
failed_to_generate_output_string += ":{}".format(output)
if not all_required_outputs_generated:
raise PacmanConfigurationException(
"Unable to generate outputs {}".format(
failed_to_generate_output_string))
self._algorithms = allocated_algorithms
self._completed_tokens = token_states.get_completed_tokens()
def _remove_outputs_which_are_inputs(self, required_outputs, inputs):
""" Generates the output list which has pruned outputs which are\
already in the input list
:param required_outputs: the original output listings
:param inputs: the inputs given to the executor
:return: new list of outputs
:rtype: iterable(str)
"""
copy_required_outputs = set(required_outputs)
for input_type in inputs:
if input_type in copy_required_outputs:
copy_required_outputs.remove(input_type)
return copy_required_outputs
def _remove_complete_tokens(self, tokens, output_tokens):
return {
token for token in output_tokens
if not tokens.is_token_complete(Token(token))
}
def _deduce_inputs_required_to_run(
self, algorithm, inputs, tokens, fake_inputs, fake_tokens):
left_over_inputs = " {}: [".format(algorithm.algorithm_id)
separator = ""
for algorithm_inputs, extra in (
(algorithm.required_inputs, ""),
(algorithm.optional_inputs, " (optional)")):
for an_input in algorithm_inputs:
unfound_types = [
param_type for param_type in an_input.param_types
if param_type not in inputs and
param_type not in fake_inputs]
found_types = [
param_type for param_type in an_input.param_types
if param_type in inputs or param_type in fake_inputs]
if unfound_types:
left_over_inputs += "{}'{}'{}".format(
separator, unfound_types, extra)
if found_types:
left_over_inputs += " (but found '{}')".format(
found_types)
separator = ", "
for a_token in algorithm.required_input_tokens:
if (not tokens.is_token_complete(a_token) and
not fake_tokens.is_token_complete(a_token)):
left_over_inputs += "{}'{}'".format(
separator, a_token)
separator = ", "
left_over_inputs += "]\n"
return left_over_inputs
@staticmethod
def _remove_algorithm_and_update_outputs(
algorithm_list, algorithm, inputs, generated_outputs,
outputs_to_find):
""" Update data structures
:param algorithm_list: the list of algorithms to remove algorithm from
:param algorithm: the algorithm to remove
:param inputs: the inputs list to update output from algorithm
:param generated_outputs: \
the outputs list to update output from algorithm
:rtype: None
"""
algorithm_list.remove(algorithm)
for output in algorithm.outputs:
inputs.add(output.output_type)
generated_outputs.add(output.output_type)
if output.output_type in outputs_to_find:
outputs_to_find.remove(output.output_type)
@staticmethod
def _locate_suitable_algorithm(
algorithm_list, inputs, generated_outputs, tokens,
fake_inputs, fake_tokens, check_generated_outputs,
force_optionals):
""" Locates a suitable algorithm
:param algorithm_list: the list of algorithms to choose from
:param inputs: the inputs available currently
:param generated_outputs: the current outputs expected to be generated
:param tokens: the current token tracker
:param fake_inputs: the optional inputs that will never be available
:param fake_tokens: the optional tokens that will never be available
:param check_generated_outputs:\
True if an algorithm should only be selected if it generates\
an output not in the list of generated outputs
:param force_optionals:\
True if optional inputs/tokens should be considered required
:return: a suitable algorithm which uses the inputs
"""
# TODO: This can be made "cleverer" by looking at which algorithms have
# unsatisfied optional inputs. The next algorithm to run can then
# be the next that outputs the most unsatisfied optional inputs for
# other algorithms from those with the least unsatisfied optional
# inputs
# Find the next algorithm which can run now
for algorithm in algorithm_list:
# check all inputs
all_inputs_match = all(
input_parameter.input_matches(inputs)
for input_parameter in algorithm.required_inputs)
# check all required tokens
if all_inputs_match:
all_inputs_match = all(
tokens.is_token_complete(token)
for token in algorithm.required_input_tokens)
# check all optional inputs
if all_inputs_match and force_optionals:
all_inputs_match = all(
input_parameter.input_matches(inputs)
or input_parameter.input_matches(fake_inputs)
for input_parameter in algorithm.optional_inputs)
# check all optional tokens
if all_inputs_match and force_optionals:
all_inputs_match = all(
tokens.is_token_complete(token)
or fake_tokens.is_token_complete(token)
for token in algorithm.optional_input_tokens)
if all_inputs_match:
# If the list of generated outputs is not given, we're done now
if not check_generated_outputs:
return algorithm, algorithm_list
# The list of generated outputs is given, so only use the
# algorithm if it generates something new, assuming the
# algorithm generates any outputs at all
if algorithm.outputs:
for output in algorithm.outputs:
if (output.output_type not in generated_outputs
and output.output_type not in inputs):
return algorithm, algorithm_list
# If the algorithm doesn't generate a unique output,
# check if it generates a unique token
if algorithm.generated_output_tokens:
for token in algorithm.generated_output_tokens:
if not tokens.is_token_complete(token):
return algorithm, algorithm_list
# If no algorithms are available, return None
return None, algorithm_list
[docs] def execute_mapping(self):
""" Executes the algorithms
:rtype: None
"""
self._internal_type_mapping.update(self._inputs)
if self._do_direct_injection:
with injection_context(self._internal_type_mapping):
self._execute_mapping()
else:
self._execute_mapping()
def _execute_mapping(self):
if self._inject_inputs and self._do_immediate_injection:
do_injection(self._inputs)
new_outputs = dict()
for algorithm in self._algorithms:
# set up timer
timer = None
if self._do_timing:
timer = Timer()
timer.start_timing()
# Execute the algorithm
results = algorithm.call(self._internal_type_mapping)
if self._provenance_path:
self._report_full_provenance(algorithm, results)
# handle_prov_data
if self._do_timing:
self._update_timings(timer, algorithm)
if results is not None:
self._internal_type_mapping.update(results)
if self._do_immediate_injection and not self._inject_inputs:
new_outputs.update(results)
# Do injection with the outputs produced
if self._do_immediate_injection:
do_injection(results)
# Do injection with all the outputs
if self._do_post_run_injection:
if self._inject_inputs:
do_injection(self._internal_type_mapping)
else:
do_injection(new_outputs)
[docs] def get_item(self, item_type):
""" Get an item from the outputs of the execution
:param item_type: \
the item from the internal type mapping to be returned
:return: the returned item
"""
if item_type not in self._internal_type_mapping:
return None
return self._internal_type_mapping[item_type]
[docs] def get_items(self):
""" Get all the outputs from a execution
:return: dictionary of types as keys and values.
"""
return self._internal_type_mapping
[docs] def get_completed_tokens(self):
""" Get all of the tokens that have completed as part of this execution
:return: A list of tokens
"""
return self._completed_tokens
@property
def algorithm_timings(self):
return self._algorithm_timings
def _update_timings(self, timer, algorithm):
time_taken = timer.take_sample()
if self._print_timings:
logger.info("Time {} taken by {}",
time_taken, algorithm.algorithm_id)
self._algorithm_timings.append(
(algorithm.algorithm_id, time_taken, self._provenance_name))
def _report_full_provenance(self, algorithm, results):
try:
with open(self._provenance_path, "a") as provenance_file:
algorithm.write_provenance_header(provenance_file)
if algorithm.required_inputs:
provenance_file.write("\trequired_inputs:\n")
self._report_inputs(provenance_file,
algorithm.required_inputs)
if algorithm.optional_inputs:
provenance_file.write("\toptional_inputs:\n")
self._report_inputs(provenance_file,
algorithm.optional_inputs)
if algorithm.required_input_tokens:
provenance_file.write("\trequired_tokens:\n")
self._report_tokens(
provenance_file, algorithm.required_input_tokens)
if algorithm.optional_input_tokens:
provenance_file.write("\toptional_tokens:\n")
self._report_tokens(
provenance_file, algorithm.optional_input_tokens)
if algorithm.outputs:
provenance_file.write("\toutputs:\n")
for output in algorithm.outputs:
variable = results[output.output_type]
the_type = self._get_type(variable)
provenance_file.write(
"\t\t{}:{}\n".format(output.output_type, the_type))
if algorithm.generated_output_tokens:
provenance_file.write("\tgenerated_tokens:\n")
self._report_tokens(
provenance_file, algorithm.generated_output_tokens)
provenance_file.write("\n")
except Exception: # pylint: disable=broad-except
logger.exception("Exception when attempting to write provenance")
def _report_inputs(self, provenance_file, inputs):
for input_parameter in inputs:
name = input_parameter.name
for param_type in input_parameter.param_types:
if param_type in self._internal_type_mapping:
variable = self._internal_type_mapping[param_type]
the_type = self._get_type(variable)
provenance_file.write(
"\t\t{} {}:{}\n".format(name, param_type, the_type))
break
else:
if len(input_parameter.param_types) == 1:
provenance_file.write(
"\t\t{} None of {} provided\n"
"".format(name, input_parameter.param_types))
else:
provenance_file.write(
"\t\t{} {} not provided\n"
"".format(name, input_parameter.param_types[0]))
def _report_tokens(self, provenance_file, tokens):
for token in tokens:
part = token.part if token.part is not None else ""
if part == "":
part = " ({})".format(part)
provenance_file.write("\t\t{}{}".format(token.name, part))
def _get_type(self, variable):
if variable is None:
return "None"
the_type = type(variable)
if the_type in [bool, float, int, str]:
return variable
if the_type == set:
if not variable:
return "Empty set"
the_type = "set("
for item in variable:
the_type += "{},".format(self._get_type(item))
the_type += ")"
return the_type
elif the_type == list:
if not variable:
return "Empty list"
first_type = type(variable[0])
if all(isinstance(n, first_type) for n in variable):
return "list({}) :len{}".format(first_type, len(variable))
return the_type